253 lines
8.7 KiB
Python
253 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
# Copyright 2021 Olav63, SebF
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
"""Collections de méthodes utilitaires"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import datetime
|
|
import shutil
|
|
import pathlib
|
|
import csv
|
|
from collections import OrderedDict
|
|
import requests
|
|
from pyexcel_ods3 import save_data
|
|
from osm_vc63 import errors
|
|
|
|
|
|
class Utils:
|
|
"""Classe de méthodes utilitaires"""
|
|
|
|
overpass_url: str
|
|
geo_api_url: str
|
|
dossier_resultats: str
|
|
|
|
def __init__(self, overpass_url, geo_api_url, dossier_resultats):
|
|
self.overpass_url = overpass_url
|
|
self.geo_api_url = geo_api_url
|
|
self.dossier_resultats = dossier_resultats
|
|
|
|
def save_as_ods(self, fields, data, nom_req, ods_data_sheet=OrderedDict()):
|
|
"""Sauvegarde de data dans un classeur ods"""
|
|
|
|
# ods_data_sheet = OrderedDict()
|
|
ods_data = []
|
|
ods_data.append(fields.keys())
|
|
index_line = 2
|
|
|
|
for element in data["elements"]:
|
|
line = []
|
|
index_col = 0
|
|
|
|
for field in fields.keys():
|
|
if field in element["tags"]:
|
|
if field == "capacity":
|
|
val = element["tags"][field]
|
|
line.append(int(val) if val.isdigit() else val)
|
|
else:
|
|
line.append(element["tags"][field])
|
|
else:
|
|
line.append("")
|
|
index_col = index_col + 1
|
|
|
|
ods_data.append(line)
|
|
index_line = index_line + 1
|
|
|
|
ods_data_sheet.update({f"{nom_req}": ods_data})
|
|
|
|
save_data(self.dossier_resultats + "resultats.ods", ods_data_sheet)
|
|
|
|
logging.info("Sauvegarde résultats format ODS")
|
|
|
|
def save_as_json(self, export_json, nom_req):
|
|
"""Enregistrement du JSON"""
|
|
|
|
json_file = open(self.dossier_resultats + nom_req + ".json", "w")
|
|
json_file.write(json.dumps(export_json))
|
|
json_file.close()
|
|
|
|
logging.info("Sauvegarde résultat format JSON/OSM")
|
|
|
|
def nettoyage_json_pour_umap(self, data, overpass_query_fields):
|
|
"""Sélection uniquement des champs export_json == oui"""
|
|
|
|
export_json = {
|
|
"version": data["version"],
|
|
"generator": data["generator"] + " and ETALAB API",
|
|
"osm3s": data["osm3s"],
|
|
"elements": [],
|
|
}
|
|
|
|
index_line = 0
|
|
|
|
for element in data["elements"]:
|
|
export_json["elements"].append(
|
|
{"type": element["type"], "id": element["id"]}
|
|
)
|
|
|
|
# positionnement des éléments
|
|
if element["type"] == "node": # noeuds
|
|
export_json["elements"][index_line]["lat"] = element["lat"]
|
|
export_json["elements"][index_line]["lon"] = element["lon"]
|
|
else: # ways et relations
|
|
export_json["elements"][index_line]["center"] = element["center"]
|
|
export_json["elements"][index_line]["nodes"] = element["nodes"]
|
|
|
|
# filtrage des tags
|
|
description = ""
|
|
for tag in overpass_query_fields.keys():
|
|
if overpass_query_fields[tag]["export_json"] == "Oui":
|
|
if tag in element["tags"]:
|
|
if overpass_query_fields[tag]["FR"] != "":
|
|
description = (
|
|
description + overpass_query_fields[tag]["FR"] + " : "
|
|
)
|
|
|
|
description = description + str(element["tags"][tag]) + "\n"
|
|
export_json["elements"][index_line]["tags"] = {"description": description}
|
|
|
|
index_line = index_line + 1
|
|
|
|
return export_json
|
|
|
|
def run_overpass_query(self, critere, aire_de_recherche):
|
|
"""Envoie la requête Overpass et retourne la réponse JSON."""
|
|
|
|
overpass_query = (
|
|
"""[out:json];
|
|
(
|
|
"""
|
|
+ critere
|
|
+ """
|
|
);
|
|
out center;
|
|
"""
|
|
)
|
|
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
|
|
|
|
response = requests.get(self.overpass_url, params={"data": overpass_query})
|
|
|
|
if response.status_code != 200:
|
|
raise errors.OverpassError(response.status_code)
|
|
|
|
return response.json()
|
|
|
|
def geocodage_csv(self, data):
|
|
"""
|
|
Renseigne une adresse pour chaque élément de data
|
|
en une fois via csv
|
|
"""
|
|
|
|
url = self.geo_api_url + "/reverse/csv/"
|
|
|
|
# création du fichier à envoyer à l'API
|
|
with open("tmp_geocodage.csv", "w", newline="") as tmp_csv_file:
|
|
csv_writer = csv.writer(tmp_csv_file)
|
|
csv_writer.writerow(["lat", "lon"])
|
|
|
|
for element in data["elements"]:
|
|
if element["type"] == "node":
|
|
csv_writer.writerow([element["lat"], element["lon"]])
|
|
else:
|
|
csv_writer.writerow(
|
|
[element["center"]["lat"], element["center"]["lon"]]
|
|
)
|
|
|
|
# préparation et envoi de la requête
|
|
payload = dict(
|
|
[("data", ("tmp_geocodage.csv", open("tmp_geocodage.csv", "rb").read()))]
|
|
)
|
|
response = requests.post(url, files=payload)
|
|
|
|
# nettoyage
|
|
os.remove("tmp_geocodage.csv")
|
|
|
|
if response.status_code != 200:
|
|
raise errors.GeoApiError(response.status_code)
|
|
|
|
# affectation des addresses
|
|
for element in data["elements"]:
|
|
for row in csv.DictReader(response.text.splitlines()):
|
|
if element["type"] == "node":
|
|
lat_ok = row["lat"] == str(element["lat"])
|
|
lon_ok = row["lon"] == str(element["lon"])
|
|
else:
|
|
lat_ok = row["lat"] == str(element["center"]["lat"])
|
|
lon_ok = row["lon"] == str(element["center"]["lon"])
|
|
|
|
if lat_ok and lon_ok:
|
|
element["tags"]["api_adresse:geometry:coordinates:lon"] = row[
|
|
"result_longitude"
|
|
]
|
|
element["tags"]["api_adresse:geometry:coordinates:lat"] = row[
|
|
"result_latitude"
|
|
]
|
|
element["tags"]["api_adresse:properties:label"] = row[
|
|
"result_label"
|
|
]
|
|
element["tags"]["api_adresse:properties:housenumber"] = row[
|
|
"result_housenumber"
|
|
]
|
|
element["tags"]["api_adresse:properties:type"] = row["result_type"]
|
|
element["tags"]["api_adresse:properties:name"] = row["result_name"]
|
|
element["tags"]["api_adresse:properties:postcode"] = row[
|
|
"result_postcode"
|
|
]
|
|
element["tags"]["api_adresse:properties:citycode"] = row[
|
|
"result_citycode"
|
|
]
|
|
element["tags"]["api_adresse:properties:city"] = row["result_city"]
|
|
element["tags"]["api_adresse:properties:street"] = row[
|
|
"result_street"
|
|
]
|
|
|
|
logging.info("Géocodage inversé terminé")
|
|
|
|
return data
|
|
|
|
def traduction(self, tag, dictionnaire, data):
|
|
"""Traduit le champ tag des éléments de data avec dict"""
|
|
|
|
for element in data["elements"]:
|
|
if tag in element["tags"]:
|
|
element["tags"][tag] = dictionnaire[element["tags"][tag]]
|
|
|
|
return data
|
|
|
|
def archivage(self, dossier_archive):
|
|
"""Archivage des données précédentes"""
|
|
|
|
fichier = pathlib.Path(self.dossier_resultats + "resultats.ods")
|
|
|
|
if not fichier.exists():
|
|
return
|
|
|
|
date_fichier = datetime.date.fromtimestamp(fichier.stat().st_ctime)
|
|
os.makedirs(dossier_archive + str(date_fichier), exist_ok=True)
|
|
|
|
# pylint: disable=W0106
|
|
[
|
|
shutil.move(
|
|
self.dossier_resultats + file, dossier_archive + str(date_fichier)
|
|
)
|
|
for file in os.listdir(self.dossier_resultats)
|
|
if not os.path.isdir(self.dossier_resultats + file)
|
|
]
|