forked from Olav63/outils_OSM
252 lines
8.5 KiB
Python
252 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Collections de méthodes utilitaires"""
|
|
|
|
import json
|
|
from collections import OrderedDict
|
|
import requests
|
|
from pyexcel_ods3 import save_data
|
|
from osm_vc63 import errors
|
|
|
|
|
|
class Utils:
|
|
"""Classe de méthodes utilitaires"""
|
|
|
|
overpass_url: str
|
|
geo_api_url: str
|
|
dossier_sauvegarde: str
|
|
|
|
def __init__(self, overpass_url, geo_api_url, dossier_sauvegarde):
|
|
self.overpass_url = overpass_url
|
|
self.geo_api_url = geo_api_url
|
|
self.dossier_sauvegarde = dossier_sauvegarde
|
|
|
|
def save_as_ods(self, fields, data, nom_req):
|
|
"""Sauvegarde de data dans un classeur ods"""
|
|
|
|
ods_data_sheet = OrderedDict()
|
|
ods_data = []
|
|
ods_data.append(fields.keys())
|
|
index_line = 2
|
|
|
|
for element in data["elements"]:
|
|
line = []
|
|
index_col = 0
|
|
|
|
for field in fields.keys():
|
|
if field in element["tags"]:
|
|
if field == "capacity":
|
|
val = element["tags"][field]
|
|
line.append(int(val) if val.isdigit() else val)
|
|
else:
|
|
line.append(element["tags"][field])
|
|
else:
|
|
line.append("")
|
|
index_col = index_col + 1
|
|
|
|
ods_data.append(line)
|
|
index_line = index_line + 1
|
|
|
|
ods_data_sheet.update({"resultats": ods_data})
|
|
|
|
save_data(self.dossier_sauvegarde + nom_req + ".ods", ods_data_sheet)
|
|
|
|
print("Sauvegarde résultats format ODS")
|
|
|
|
def save_as_json(self, export_json, nom_req):
|
|
"""Enregistrement du JSON"""
|
|
|
|
json_file = open(self.dossier_sauvegarde + nom_req + ".json", "w")
|
|
json_file.write(json.dumps(export_json))
|
|
json_file.close()
|
|
|
|
print("Sauvegarde résultat format JSON/OSM")
|
|
|
|
def nettoyage_json_pour_umap(self, data, overpass_query_fields):
|
|
"""Sélection uniquement des champs export_json == oui"""
|
|
|
|
export_json = {
|
|
"version": data["version"],
|
|
"generator": data["generator"] + " and ETALAB API",
|
|
"osm3s": data["osm3s"],
|
|
"elements": [],
|
|
}
|
|
|
|
index_line = 0
|
|
|
|
for element in data["elements"]:
|
|
export_json["elements"].append(
|
|
{"type": element["type"], "id": element["id"]}
|
|
)
|
|
|
|
# positionnement des éléments
|
|
if element["type"] == "node": # noeuds
|
|
export_json["elements"][index_line]["lat"] = element["lat"]
|
|
export_json["elements"][index_line]["lon"] = element["lon"]
|
|
else: # ways et relations
|
|
export_json["elements"][index_line]["center"] = element["center"]
|
|
export_json["elements"][index_line]["nodes"] = element["nodes"]
|
|
|
|
# filtrage des tags
|
|
description = ""
|
|
for tag in overpass_query_fields.keys():
|
|
if overpass_query_fields[tag]["export_json"] == "Oui":
|
|
if tag in element["tags"]:
|
|
if overpass_query_fields[tag]["FR"] != "":
|
|
description = (
|
|
description + overpass_query_fields[tag]["FR"] + " : "
|
|
)
|
|
|
|
description = description + str(element["tags"][tag]) + "\n"
|
|
export_json["elements"][index_line]["tags"] = {"description": description}
|
|
|
|
index_line = index_line + 1
|
|
|
|
return export_json
|
|
|
|
def run_overpass_query(self, critere, aire_de_recherche):
|
|
"""Envoie la requête Overpass et retourne la réponse JSON."""
|
|
|
|
overpass_query = (
|
|
"""[out:json];
|
|
(
|
|
"""
|
|
+ critere
|
|
+ """
|
|
);
|
|
out center;
|
|
"""
|
|
)
|
|
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
|
|
|
|
# print("Execution requete overpass : \n" + overpass_query)
|
|
response = requests.get(self.overpass_url, params={"data": overpass_query})
|
|
|
|
if response.status_code != 200:
|
|
raise errors.OverpassError(response.status_code)
|
|
|
|
return response.json()
|
|
|
|
def run_reverse_geocoding(self, lat, lon):
|
|
"""Retourne une adresse JSON à partir d'une position GPS."""
|
|
|
|
url = self.geo_api_url + "/reverse/"
|
|
|
|
response = requests.get(url, params={"lon": str(lon), "lat": str(lat)})
|
|
|
|
if response.status_code != 200:
|
|
raise errors.GeoApiError(response.status_code)
|
|
|
|
return response.json()
|
|
|
|
# TODO : optimiser en faisant un appel au service /reverse/csv/ plutot que le service unitaire /reverse/
|
|
def geocodage(self, data):
|
|
"""Renseigne une adresse pour chaque élément de data"""
|
|
|
|
for element in self.progress_bar(data["elements"], prefix="Géocodage"):
|
|
|
|
if element["type"] == "node":
|
|
rev_geocode = self.run_reverse_geocoding(element["lat"], element["lon"])
|
|
else:
|
|
rev_geocode = self.run_reverse_geocoding(
|
|
element["center"]["lat"], element["center"]["lon"]
|
|
)
|
|
|
|
api_adresse = rev_geocode["features"][0]
|
|
|
|
element["tags"]["api_adresse:geometry:coordinates:lon"] = api_adresse[
|
|
"geometry"
|
|
]["coordinates"][0]
|
|
element["tags"]["api_adresse:geometry:coordinates:lat"] = api_adresse[
|
|
"geometry"
|
|
]["coordinates"][1]
|
|
|
|
element["tags"]["api_adresse:properties:label"] = api_adresse["properties"][
|
|
"label"
|
|
]
|
|
element["tags"]["api_adresse:properties:score"] = api_adresse["properties"][
|
|
"score"
|
|
]
|
|
|
|
if "housenumber" in api_adresse["properties"]:
|
|
element["tags"]["api_adresse:properties:housenumber"] = api_adresse[
|
|
"properties"
|
|
]["housenumber"]
|
|
|
|
element["tags"]["api_adresse:properties:type"] = api_adresse["properties"][
|
|
"type"
|
|
]
|
|
|
|
element["tags"]["api_adresse:properties:name"] = api_adresse["properties"][
|
|
"name"
|
|
]
|
|
element["tags"]["api_adresse:properties:postcode"] = api_adresse[
|
|
"properties"
|
|
]["postcode"]
|
|
element["tags"]["api_adresse:properties:citycode"] = api_adresse[
|
|
"properties"
|
|
]["citycode"]
|
|
element["tags"]["api_adresse:properties:city"] = api_adresse["properties"][
|
|
"city"
|
|
]
|
|
|
|
if "street" in api_adresse["properties"]:
|
|
element["tags"]["api_adresse:properties:street"] = api_adresse[
|
|
"properties"
|
|
]["street"]
|
|
|
|
element["tags"]["api_adresse:properties:attribution"] = rev_geocode[
|
|
"attribution"
|
|
]
|
|
element["tags"]["api_adresse:properties:licence"] = rev_geocode["licence"]
|
|
|
|
return data
|
|
|
|
def traduction(self, tag, dictionnaire, data):
|
|
"""Traduit le champ tag des éléments de data avec dict"""
|
|
|
|
for element in data["elements"]:
|
|
if tag in element["tags"]:
|
|
element["tags"][tag] = dictionnaire[element["tags"][tag]]
|
|
|
|
return data
|
|
|
|
def progress_bar(
|
|
# pylint:disable=C0330
|
|
self,
|
|
iterable,
|
|
decimals=1,
|
|
length=50,
|
|
prefix="",
|
|
fill="█",
|
|
print_end="\r",
|
|
):
|
|
"""
|
|
Call in a loop to create terminal progress bar
|
|
@params:
|
|
iterable - Required : iterable object (Iterable)
|
|
decimals - Optional : positive number of decimals in percent complete (Int)
|
|
length - Optional : character length of bar (Int)
|
|
prefix - Optional : prefix string (Str)
|
|
fill - Optional : bar fill character (Str)
|
|
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
|
|
"""
|
|
total = len(iterable)
|
|
|
|
if total == 0:
|
|
return
|
|
|
|
# Initial Call
|
|
print(f"\r{prefix} |{'-' * length}| {0}%", end=print_end)
|
|
# Update Progress Bar
|
|
for i, item in enumerate(iterable):
|
|
yield item
|
|
percent = ("{0:." + str(decimals) + "f}").format(
|
|
100 * ((i + 1) / float(total))
|
|
)
|
|
filled = int(length * (i + 1) // total)
|
|
progress = fill * filled + "-" * (length - filled)
|
|
print(f"\r{prefix} |{progress}| {percent}%", end=print_end)
|
|
|
|
# Print New Line on Complete
|
|
print()
|