#!/usr/bin/env python3 # Copyright 2021 Olav63, SebF # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Collections de méthodes utilitaires""" import json import logging import os import datetime import shutil import pathlib import csv from collections import OrderedDict import requests from pyexcel_ods3 import save_data from osm_vc63 import errors class Utils: """Classe de méthodes utilitaires""" overpass_url: str geo_api_url: str dossier_resultats: str def __init__(self, overpass_url, geo_api_url, dossier_resultats): self.overpass_url = overpass_url self.geo_api_url = geo_api_url self.dossier_resultats = dossier_resultats with open("traductions.json", encoding="utf-8") as trads: self.traductions = json.load(trads) self.lecture_requetes() def save_as_ods(self, fields, data, nom_req, ods_data_sheet=OrderedDict()): """Sauvegarde de data dans un classeur ods. Le paramètre ods_data_sheet est évalué une seule fois à la définition de la fonction, ce qui enregistre les data de chaque appel dans une nouvelle feuille. """ # ods_data_sheet = OrderedDict() ods_data = [] ods_data.append(fields.keys()) index_line = 2 for element in data["elements"]: line = [] index_col = 0 for field in fields.keys(): if field in element["tags"]: if field == "capacity": val = element["tags"][field] line.append(int(val) if val.isdigit() else val) else: line.append(element["tags"][field]) else: line.append("") index_col = index_col + 1 ods_data.append(line) index_line = index_line + 1 ods_data_sheet.update({f"{nom_req}": ods_data}) save_data(self.dossier_resultats + "resultats.ods", ods_data_sheet) logging.info("Sauvegarde résultats format ODS") def save_as_json(self, export_json, nom_req): """Enregistrement du JSON""" json_file = open( self.dossier_resultats + nom_req + ".json", "w", encoding="utf-8" ) json_file.write(json.dumps(export_json)) json_file.close() logging.info("Sauvegarde résultat format JSON/OSM") def nettoyage_json_pour_umap(self, data, overpass_query_fields): """Sélection uniquement des champs export_json == oui""" export_json = { "version": data["version"], "generator": data["generator"] + " and ETALAB API", "osm3s": data["osm3s"], "elements": [], } index_line = 0 for element in data["elements"]: export_json["elements"].append( {"type": element["type"], "id": element["id"]} ) # positionnement des éléments if element["type"] == "node": # noeuds export_json["elements"][index_line]["lat"] = element["lat"] export_json["elements"][index_line]["lon"] = element["lon"] else: # ways et relations export_json["elements"][index_line]["center"] = element["center"] export_json["elements"][index_line]["nodes"] = element["nodes"] # filtrage des tags description = "" for tag in overpass_query_fields.keys(): if overpass_query_fields[tag]["export_json"] == "Oui": if tag in element["tags"]: if overpass_query_fields[tag]["FR"] != "": description = ( description + overpass_query_fields[tag]["FR"] + " : " ) description = description + str(element["tags"][tag]) + "\n" export_json["elements"][index_line]["tags"] = {"description": description} index_line = index_line + 1 return export_json def run_overpass_query(self, critere, aire_de_recherche): """Envoie la requête Overpass et retourne la réponse JSON.""" overpass_query = ( """[out:json]; ( """ + critere + """ ); out center; """ ) overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche) response = requests.get(self.overpass_url, params={"data": overpass_query}) if response.status_code != 200: raise errors.OverpassError(response.status_code) return response.json() def geocodage_csv(self, data): """ Renseigne une adresse pour chaque élément de data en une fois via csv """ url = self.geo_api_url + "/reverse/csv/" # création du fichier à envoyer à l'API with open( "tmp_geocodage.csv", "w", newline="", encoding="utf-8" ) as tmp_csv_file: csv_writer = csv.writer(tmp_csv_file) csv_writer.writerow(["lat", "lon"]) for element in data["elements"]: if element["type"] == "node": csv_writer.writerow([element["lat"], element["lon"]]) else: csv_writer.writerow( [element["center"]["lat"], element["center"]["lon"]] ) # préparation et envoi de la requête payload = dict( [("data", ("tmp_geocodage.csv", open("tmp_geocodage.csv", "rb").read()))] ) response = requests.post(url, files=payload) # nettoyage os.remove("tmp_geocodage.csv") if response.status_code != 200: raise errors.GeoApiError(response.status_code) # affectation des addresses for element in data["elements"]: for row in csv.DictReader(response.text.splitlines()): if element["type"] == "node": lat_ok = row["lat"] == str(element["lat"]) lon_ok = row["lon"] == str(element["lon"]) else: lat_ok = row["lat"] == str(element["center"]["lat"]) lon_ok = row["lon"] == str(element["center"]["lon"]) if lat_ok and lon_ok: element["tags"]["api_adresse:geometry:coordinates:lon"] = row[ "result_longitude" ] element["tags"]["api_adresse:geometry:coordinates:lat"] = row[ "result_latitude" ] element["tags"]["api_adresse:properties:label"] = row[ "result_label" ] element["tags"]["api_adresse:properties:housenumber"] = row[ "result_housenumber" ] element["tags"]["api_adresse:properties:type"] = row["result_type"] element["tags"]["api_adresse:properties:name"] = row["result_name"] element["tags"]["api_adresse:properties:postcode"] = row[ "result_postcode" ] element["tags"]["api_adresse:properties:citycode"] = row[ "result_citycode" ] element["tags"]["api_adresse:properties:city"] = row["result_city"] element["tags"]["api_adresse:properties:street"] = row[ "result_street" ] logging.info("Géocodage inversé terminé") return data def traduction(self, tag, dictionnaire, data): """Traduit le champ tag des éléments de data avec dict""" for element in data["elements"]: if tag in element["tags"]: element["tags"][tag] = dictionnaire[element["tags"][tag]] return data def archivage(self, dossier_archive): """Archivage des données précédentes""" fichier = pathlib.Path(self.dossier_resultats + "resultats.ods") if not fichier.exists(): return date_fichier = datetime.date.fromtimestamp(fichier.stat().st_ctime) # une seule archive par date if os.path.isdir(dossier_archive + str(date_fichier)): shutil.rmtree(dossier_archive + str(date_fichier)) os.makedirs(dossier_archive + str(date_fichier)) # pylint: disable=W0106 [ shutil.move( self.dossier_resultats + file, dossier_archive + str(date_fichier) ) for file in os.listdir(self.dossier_resultats) if not os.path.isdir(self.dossier_resultats + file) ] def lecture_requetes(self): """Lecture des requêtes dans les fichiers de configuration""" with open("requetes.json", encoding="utf-8") as reqs: self.json_reqs = json.load(reqs) with open("champs_generiques.json", encoding="utf-8") as champs_generiques: self.json_champs_generiques = json.load(champs_generiques) for req in self.json_reqs: self.json_reqs[req]["champs"] = dict(self.json_reqs[req]["champ_local"]) for champ in self.json_reqs[req]["champs_generiques"]: self.json_reqs[req]["champs"].update(self.json_champs_generiques[champ]) # nettoyage self.json_reqs[req].pop("champ_local", None) self.json_reqs[req].pop("champs_generiques", None)