RDOO/rdoopy/utils.py

300 lines
11 KiB
Python

#!/usr/bin/env python3
# Copyright 2021 Olav63, SebF
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Collections de méthodes utilitaires"""
import json
import logging
import os
import datetime
import shutil
import pathlib
import csv
from collections import OrderedDict
import requests
from pyexcel_ods3 import save_data
from rdoopy import errors
class Utils:
"""Classe de méthodes utilitaires"""
overpass_url: str
geo_api_url: str
dossier_resultats: str
def __init__(self, overpass_url, geo_api_url, dossier_resultats, timeout):
self.overpass_url = overpass_url
self.geo_api_url = geo_api_url
self.dossier_resultats = dossier_resultats
self.timeout = timeout
with open("configuration/traductions.json", encoding="utf-8") as trads:
self.traductions = json.load(trads)
self.lecture_requetes()
def save_as_ods(self, fields, data, nom_req, ods_data_sheet=OrderedDict()):
"""Sauvegarde de data dans un classeur ods.
Le paramètre ods_data_sheet est évalué une seule fois à la définition de la fonction,
ce qui enregistre les data de chaque appel dans une nouvelle feuille.
"""
# ods_data_sheet = OrderedDict()
ods_data = []
ods_data.append(fields.keys())
index_line = 2
for element in data["elements"]:
line = []
index_col = 0
for field in fields.keys():
if field in element["tags"]:
if field == "capacity":
val = element["tags"][field]
line.append(int(val) if val.isdigit() else val)
else:
line.append(element["tags"][field])
else:
line.append("")
index_col = index_col + 1
ods_data.append(line)
index_line = index_line + 1
ods_data_sheet.update({f"{nom_req}": ods_data})
save_data(self.dossier_resultats + "resultats.ods", ods_data_sheet)
logging.info("Sauvegarde résultats format ODS")
def save_as_json(self, export_json, nom_req):
"""Enregistrement du JSON"""
json_file = open(
self.dossier_resultats + nom_req + ".json", "w", encoding="utf-8"
)
json_file.write(json.dumps(export_json))
json_file.close()
logging.info("Sauvegarde résultat format JSON/OSM")
def export_json_pour_umap(self, data, overpass_query_fields, concatenation):
"""Sélection uniquement des champs export_json == oui"""
export_json = {
"version": data["version"],
"generator": data["generator"] + " and ETALAB API",
"osm3s": data["osm3s"],
"elements": [],
}
index_line = 0
for element in data["elements"]:
export_json["elements"].append(
{"type": element["type"], "id": element["id"], "tags": {}}
)
# positionnement des éléments
if element["type"] == "node": # noeuds
export_json["elements"][index_line]["lat"] = element["lat"]
export_json["elements"][index_line]["lon"] = element["lon"]
else: # ways et relations
export_json["elements"][index_line]["center"] = element["center"]
# filtrage des tags
description = ""
for tag in overpass_query_fields.keys():
if (
overpass_query_fields[tag]["export_json"] == "Oui"
and tag in element["tags"]
):
if concatenation:
ajout = (
str(element["tags"][tag])
if overpass_query_fields[tag]["FR"] == ""
else overpass_query_fields[tag]["FR"]
+ " : "
+ str(element["tags"][tag])
)
description = description + ajout + "\n"
export_json["elements"][index_line]["tags"] = {
"description": description[:-1]
}
else:
tagname = (
tag
if overpass_query_fields[tag]["FR"] == ""
else overpass_query_fields[tag]["FR"]
)
export_json["elements"][index_line]["tags"].update(
{tagname: element["tags"][tag]}
)
index_line = index_line + 1
return export_json
def run_overpass_query(self, critere, aire_de_recherche):
"""Envoie la requête Overpass et retourne la réponse JSON."""
overpass_query = (
f"[out:json][timeout: {str(self.timeout)}];({critere});out center;"
)
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
response = requests.get(self.overpass_url, params={"data": overpass_query})
if response.status_code != 200:
raise errors.OverpassError(response.status_code)
return response.json()
def geocodage_csv(self, data):
"""
Renseigne une adresse pour chaque élément de data
en une fois via csv
"""
url = self.geo_api_url + "/reverse/csv/"
# création du fichier à envoyer à l'API
with open(
"tmp_geocodage.csv", "w", newline="", encoding="utf-8"
) as tmp_csv_file:
csv_writer = csv.writer(tmp_csv_file)
csv_writer.writerow(["lat", "lon"])
for element in data["elements"]:
if element["type"] == "node":
csv_writer.writerow([element["lat"], element["lon"]])
else:
csv_writer.writerow(
[element["center"]["lat"], element["center"]["lon"]]
)
# préparation et envoi de la requête
payload = dict(
[("data", ("tmp_geocodage.csv", open("tmp_geocodage.csv", "rb").read()))]
)
response = requests.post(url, files=payload)
# nettoyage
os.remove("tmp_geocodage.csv")
if response.status_code != 200:
raise errors.GeoApiError(response.status_code)
# affectation des addresses
for element in data["elements"]:
for row in csv.DictReader(response.text.splitlines()):
if element["type"] == "node":
lat_ok = row["lat"] == str(element["lat"])
lon_ok = row["lon"] == str(element["lon"])
else:
lat_ok = row["lat"] == str(element["center"]["lat"])
lon_ok = row["lon"] == str(element["center"]["lon"])
if lat_ok and lon_ok:
element["tags"]["api_adresse:geometry:coordinates:lon"] = row[
"result_longitude"
]
element["tags"]["api_adresse:geometry:coordinates:lat"] = row[
"result_latitude"
]
element["tags"]["api_adresse:properties:label"] = row[
"result_label"
]
element["tags"]["api_adresse:properties:housenumber"] = row[
"result_housenumber"
]
element["tags"]["api_adresse:properties:type"] = row["result_type"]
element["tags"]["api_adresse:properties:name"] = row["result_name"]
element["tags"]["api_adresse:properties:postcode"] = row[
"result_postcode"
]
element["tags"]["api_adresse:properties:citycode"] = row[
"result_citycode"
]
element["tags"]["api_adresse:properties:city"] = row["result_city"]
element["tags"]["api_adresse:properties:street"] = row[
"result_street"
]
logging.info("Géocodage inversé terminé")
return data
def traduction(self, tag, dictionnaire, data):
"""Traduit le champ tag des éléments de data avec dict"""
for element in data["elements"]:
if tag in element["tags"] and tag in dictionnaire.keys():
element["tags"][tag] = dictionnaire[element["tags"][tag]]
return data
def archivage(self, dossier_archive):
"""Archivage des données précédentes"""
fichier = pathlib.Path(self.dossier_resultats + "resultats.ods")
if not fichier.exists():
return
date_fichier = datetime.date.fromtimestamp(fichier.stat().st_ctime)
# une seule archive par date
if os.path.isdir(dossier_archive + str(date_fichier)):
shutil.rmtree(dossier_archive + str(date_fichier))
os.makedirs(dossier_archive + str(date_fichier))
# pylint: disable=W0106
[
shutil.move(
self.dossier_resultats + file, dossier_archive + str(date_fichier)
)
for file in os.listdir(self.dossier_resultats)
if not os.path.isdir(self.dossier_resultats + file)
]
def lecture_requetes(self):
"""Lecture des requêtes dans les fichiers de configuration"""
with open("configuration/requetes.json", encoding="utf-8") as reqs:
self.json_reqs = json.load(reqs)
with open(
"configuration/champs_generiques.json", encoding="utf-8"
) as champs_generiques:
self.json_champs_generiques = json.load(champs_generiques)
for req in self.json_reqs:
self.json_reqs[req]["champs"] = dict(self.json_reqs[req]["champ_local"])
for champ in self.json_reqs[req]["champs_generiques"]:
self.json_reqs[req]["champs"].update(self.json_champs_generiques[champ])
# nettoyage
self.json_reqs[req].pop("champ_local", None)
self.json_reqs[req].pop("champs_generiques", None)