forked from Olav63/outils_OSM
138 lines
4.4 KiB
Python
138 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import requests
|
|
from pyexcel_ods3 import save_data
|
|
from collections import OrderedDict
|
|
from osm_vc63 import errors
|
|
|
|
|
|
class Utils:
|
|
overpass_url: str
|
|
geo_api_url: str
|
|
dossier_sauvegarde: str
|
|
|
|
def __init__(self, overpass_url, geo_api_url, dossier_sauvegarde):
|
|
self.overpass_url = overpass_url
|
|
self.geo_api_url = geo_api_url
|
|
self.dossier_sauvegarde = dossier_sauvegarde
|
|
|
|
def save_as_ods(self, fields, data, nom_req):
|
|
"""Sauvegarde de data dans un classeur ods"""
|
|
|
|
ODSdataSheet = OrderedDict()
|
|
ODSdata = []
|
|
ODSdata.append(fields.keys())
|
|
index_line = 2
|
|
|
|
for element in data["elements"]:
|
|
line = []
|
|
index_col = 0
|
|
|
|
for field in fields.keys():
|
|
if field in element["tags"]:
|
|
if field == "capacity":
|
|
val = element["tags"][field]
|
|
line.append(int(val) if val.isdigit() else val)
|
|
else:
|
|
line.append(element["tags"][field])
|
|
else:
|
|
line.append("")
|
|
index_col = index_col + 1
|
|
|
|
ODSdata.append(line)
|
|
index_line = index_line + 1
|
|
|
|
ODSdataSheet.update({"resultats": ODSdata})
|
|
|
|
save_data(self.dossier_sauvegarde + nom_req + ".ods", ODSdataSheet)
|
|
|
|
print("Sauvegarde résultats format ODS pour " + nom_req)
|
|
|
|
def save_as_json(self, export_json, nom_req):
|
|
"""Enregistrement du JSON"""
|
|
|
|
jsonFile = open(self.dossier_sauvegarde + nom_req + ".json", "w")
|
|
jsonFile.write(json.dumps(export_json))
|
|
jsonFile.close()
|
|
|
|
print("Sauvegarde résultat format JSON/OSM " + nom_req)
|
|
|
|
def nettoyage_json_pour_umap(self, data, overpass_query_fields):
|
|
"""Sélection uniquement des champs export_json == oui"""
|
|
|
|
export_json = {
|
|
"version": data["version"],
|
|
"generator": data["generator"] + " and ETALAB API",
|
|
"osm3s": data["osm3s"],
|
|
"elements": [],
|
|
}
|
|
|
|
index_line = 0
|
|
|
|
for element in data["elements"]:
|
|
export_json["elements"].append(
|
|
{"type": element["type"], "id": element["id"]}
|
|
)
|
|
|
|
# positionnement des éléments
|
|
if element["type"] == "node": # noeuds
|
|
export_json["elements"][index_line]["lat"] = element["lat"]
|
|
export_json["elements"][index_line]["lon"] = element["lon"]
|
|
else: # ways et relations
|
|
export_json["elements"][index_line]["center"] = element["center"]
|
|
export_json["elements"][index_line]["nodes"] = element["nodes"]
|
|
|
|
# filtrage des tags
|
|
description = ""
|
|
for tag in overpass_query_fields.keys():
|
|
if overpass_query_fields[tag]["export_json"] == "Oui":
|
|
if tag in element["tags"]:
|
|
if overpass_query_fields[tag]["FR"] != "":
|
|
description = (
|
|
description + overpass_query_fields[tag]["FR"] + " : "
|
|
)
|
|
|
|
description = description + str(element["tags"][tag]) + "\n"
|
|
export_json["elements"][index_line]["tags"] = {"description": description}
|
|
|
|
index_line = index_line + 1
|
|
|
|
return export_json
|
|
|
|
def run_overpass_query(self, critere, aire_de_recherche):
|
|
"""Envoie la requête Overpass et retourne la réponse JSON."""
|
|
|
|
overpass_query = (
|
|
"""[out:json];
|
|
(
|
|
"""
|
|
+ critere
|
|
+ """
|
|
);
|
|
out center;
|
|
"""
|
|
)
|
|
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
|
|
|
|
print("Execution requete overpass : \n" + overpass_query)
|
|
response = requests.get(self.overpass_url, params={"data": overpass_query})
|
|
|
|
if response.status_code != 200:
|
|
raise errors.OverpassError(response.status_code)
|
|
|
|
return response.json()
|
|
|
|
def run_reverse_geocoding(self, lat, lon):
|
|
"""Retourne une adresse JSON à partir d'une position GPS."""
|
|
|
|
url = self.geo_api_url + "/reverse/"
|
|
|
|
response = requests.get(url, params={"lon": str(lon), "lat": str(lat)})
|
|
|
|
if response.status_code != 200:
|
|
raise errors.GeoApiError(response.status_code)
|
|
|
|
return response.json()
|
|
|