outils_OSM/osm_vc63/utils.py

257 lines
8.8 KiB
Python

#!/usr/bin/env python3
# Copyright 2021 Olav63, SebF
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Collections de méthodes utilitaires"""
import json
import logging
import os
import datetime
import shutil
import pathlib
import csv
from collections import OrderedDict
import requests
from pyexcel_ods3 import save_data
from osm_vc63 import errors
class Utils:
"""Classe de méthodes utilitaires"""
overpass_url: str
geo_api_url: str
dossier_resultats: str
def __init__(self, overpass_url, geo_api_url, dossier_resultats):
self.overpass_url = overpass_url
self.geo_api_url = geo_api_url
self.dossier_resultats = dossier_resultats
def save_as_ods(self, fields, data, nom_req, ods_data_sheet=OrderedDict()):
"""Sauvegarde de data dans un classeur ods"""
# ods_data_sheet = OrderedDict()
ods_data = []
ods_data.append(fields.keys())
index_line = 2
for element in data["elements"]:
line = []
index_col = 0
for field in fields.keys():
if field in element["tags"]:
if field == "capacity":
val = element["tags"][field]
line.append(int(val) if val.isdigit() else val)
else:
line.append(element["tags"][field])
else:
line.append("")
index_col = index_col + 1
ods_data.append(line)
index_line = index_line + 1
ods_data_sheet.update({f"{nom_req}": ods_data})
save_data(self.dossier_resultats + "resultats.ods", ods_data_sheet)
logging.info("Sauvegarde résultats format ODS")
def save_as_json(self, export_json, nom_req):
"""Enregistrement du JSON"""
json_file = open(self.dossier_resultats + nom_req + ".json", "w")
json_file.write(json.dumps(export_json))
json_file.close()
logging.info("Sauvegarde résultat format JSON/OSM")
def nettoyage_json_pour_umap(self, data, overpass_query_fields):
"""Sélection uniquement des champs export_json == oui"""
export_json = {
"version": data["version"],
"generator": data["generator"] + " and ETALAB API",
"osm3s": data["osm3s"],
"elements": [],
}
index_line = 0
for element in data["elements"]:
export_json["elements"].append(
{"type": element["type"], "id": element["id"]}
)
# positionnement des éléments
if element["type"] == "node": # noeuds
export_json["elements"][index_line]["lat"] = element["lat"]
export_json["elements"][index_line]["lon"] = element["lon"]
else: # ways et relations
export_json["elements"][index_line]["center"] = element["center"]
export_json["elements"][index_line]["nodes"] = element["nodes"]
# filtrage des tags
description = ""
for tag in overpass_query_fields.keys():
if overpass_query_fields[tag]["export_json"] == "Oui":
if tag in element["tags"]:
if overpass_query_fields[tag]["FR"] != "":
description = (
description + overpass_query_fields[tag]["FR"] + " : "
)
description = description + str(element["tags"][tag]) + "\n"
export_json["elements"][index_line]["tags"] = {"description": description}
index_line = index_line + 1
return export_json
def run_overpass_query(self, critere, aire_de_recherche):
"""Envoie la requête Overpass et retourne la réponse JSON."""
overpass_query = (
"""[out:json];
(
"""
+ critere
+ """
);
out center;
"""
)
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
response = requests.get(self.overpass_url, params={"data": overpass_query})
if response.status_code != 200:
raise errors.OverpassError(response.status_code)
return response.json()
def geocodage_csv(self, data):
"""
Renseigne une adresse pour chaque élément de data
en une fois via csv
"""
url = self.geo_api_url + "/reverse/csv/"
# création du fichier à envoyer à l'API
with open("tmp_geocodage.csv", "w", newline="") as tmp_csv_file:
csv_writer = csv.writer(tmp_csv_file)
csv_writer.writerow(["lat", "lon"])
for element in data["elements"]:
if element["type"] == "node":
csv_writer.writerow([element["lat"], element["lon"]])
else:
csv_writer.writerow(
[element["center"]["lat"], element["center"]["lon"]]
)
# préparation et envoi de la requête
payload = dict(
[("data", ("tmp_geocodage.csv", open("tmp_geocodage.csv", "rb").read()))]
)
response = requests.post(url, files=payload)
# nettoyage
os.remove("tmp_geocodage.csv")
if response.status_code != 200:
raise errors.GeoApiError(response.status_code)
# affectation des addresses
for element in data["elements"]:
for row in csv.DictReader(response.text.splitlines()):
if element["type"] == "node":
lat_ok = row["lat"] == str(element["lat"])
lon_ok = row["lon"] == str(element["lon"])
else:
lat_ok = row["lat"] == str(element["center"]["lat"])
lon_ok = row["lon"] == str(element["center"]["lon"])
if lat_ok and lon_ok:
element["tags"]["api_adresse:geometry:coordinates:lon"] = row[
"result_longitude"
]
element["tags"]["api_adresse:geometry:coordinates:lat"] = row[
"result_latitude"
]
element["tags"]["api_adresse:properties:label"] = row[
"result_label"
]
element["tags"]["api_adresse:properties:housenumber"] = row[
"result_housenumber"
]
element["tags"]["api_adresse:properties:type"] = row["result_type"]
element["tags"]["api_adresse:properties:name"] = row["result_name"]
element["tags"]["api_adresse:properties:postcode"] = row[
"result_postcode"
]
element["tags"]["api_adresse:properties:citycode"] = row[
"result_citycode"
]
element["tags"]["api_adresse:properties:city"] = row["result_city"]
element["tags"]["api_adresse:properties:street"] = row[
"result_street"
]
logging.info("Géocodage inversé terminé")
return data
def traduction(self, tag, dictionnaire, data):
"""Traduit le champ tag des éléments de data avec dict"""
for element in data["elements"]:
if tag in element["tags"]:
element["tags"][tag] = dictionnaire[element["tags"][tag]]
return data
def archivage(self, dossier_archive):
"""Archivage des données précédentes"""
fichier = pathlib.Path(self.dossier_resultats + "resultats.ods")
if not fichier.exists():
return
date_fichier = datetime.date.fromtimestamp(fichier.stat().st_ctime)
# une seule archive par date
if os.path.isdir(dossier_archive + str(date_fichier)):
shutil.rmtree(dossier_archive + str(date_fichier))
os.makedirs(dossier_archive + str(date_fichier))
# pylint: disable=W0106
[
shutil.move(
self.dossier_resultats + file, dossier_archive + str(date_fichier)
)
for file in os.listdir(self.dossier_resultats)
if not os.path.isdir(self.dossier_resultats + file)
]