RDOO/osm_vc63/utils.py

252 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""Collections de méthodes utilitaires"""
import json
from collections import OrderedDict
import requests
from pyexcel_ods3 import save_data
from osm_vc63 import errors
class Utils:
"""Classe de méthodes utilitaires"""
overpass_url: str
geo_api_url: str
dossier_sauvegarde: str
def __init__(self, overpass_url, geo_api_url, dossier_sauvegarde):
self.overpass_url = overpass_url
self.geo_api_url = geo_api_url
self.dossier_sauvegarde = dossier_sauvegarde
def save_as_ods(self, fields, data, nom_req):
"""Sauvegarde de data dans un classeur ods"""
ods_data_sheet = OrderedDict()
ods_data = []
ods_data.append(fields.keys())
index_line = 2
for element in data["elements"]:
line = []
index_col = 0
for field in fields.keys():
if field in element["tags"]:
if field == "capacity":
val = element["tags"][field]
line.append(int(val) if val.isdigit() else val)
else:
line.append(element["tags"][field])
else:
line.append("")
index_col = index_col + 1
ods_data.append(line)
index_line = index_line + 1
ods_data_sheet.update({"resultats": ods_data})
save_data(self.dossier_sauvegarde + nom_req + ".ods", ods_data_sheet)
print("Sauvegarde résultats format ODS pour " + nom_req)
def save_as_json(self, export_json, nom_req):
"""Enregistrement du JSON"""
json_file = open(self.dossier_sauvegarde + nom_req + ".json", "w")
json_file.write(json.dumps(export_json))
json_file.close()
print("Sauvegarde résultat format JSON/OSM " + nom_req)
def nettoyage_json_pour_umap(self, data, overpass_query_fields):
"""Sélection uniquement des champs export_json == oui"""
export_json = {
"version": data["version"],
"generator": data["generator"] + " and ETALAB API",
"osm3s": data["osm3s"],
"elements": [],
}
index_line = 0
for element in data["elements"]:
export_json["elements"].append(
{"type": element["type"], "id": element["id"]}
)
# positionnement des éléments
if element["type"] == "node": # noeuds
export_json["elements"][index_line]["lat"] = element["lat"]
export_json["elements"][index_line]["lon"] = element["lon"]
else: # ways et relations
export_json["elements"][index_line]["center"] = element["center"]
export_json["elements"][index_line]["nodes"] = element["nodes"]
# filtrage des tags
description = ""
for tag in overpass_query_fields.keys():
if overpass_query_fields[tag]["export_json"] == "Oui":
if tag in element["tags"]:
if overpass_query_fields[tag]["FR"] != "":
description = (
description + overpass_query_fields[tag]["FR"] + " : "
)
description = description + str(element["tags"][tag]) + "\n"
export_json["elements"][index_line]["tags"] = {"description": description}
index_line = index_line + 1
return export_json
def run_overpass_query(self, critere, aire_de_recherche):
"""Envoie la requête Overpass et retourne la réponse JSON."""
overpass_query = (
"""[out:json];
(
"""
+ critere
+ """
);
out center;
"""
)
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
# print("Execution requete overpass : \n" + overpass_query)
response = requests.get(self.overpass_url, params={"data": overpass_query})
if response.status_code != 200:
raise errors.OverpassError(response.status_code)
return response.json()
def run_reverse_geocoding(self, lat, lon):
"""Retourne une adresse JSON à partir d'une position GPS."""
url = self.geo_api_url + "/reverse/"
response = requests.get(url, params={"lon": str(lon), "lat": str(lat)})
if response.status_code != 200:
raise errors.GeoApiError(response.status_code)
return response.json()
# TODO : optimiser en faisant un appel au service /reverse/csv/ plutot que le service unitaire /reverse/
def geocodage(self, data):
"""Renseigne une adresse pour chaque élément de data"""
for element in self.progress_bar(data["elements"], prefix="Géocodage"):
if element["type"] == "node":
rev_geocode = self.run_reverse_geocoding(element["lat"], element["lon"])
else:
rev_geocode = self.run_reverse_geocoding(
element["center"]["lat"], element["center"]["lon"]
)
api_adresse = rev_geocode["features"][0]
element["tags"]["api_adresse:geometry:coordinates:lon"] = api_adresse[
"geometry"
]["coordinates"][0]
element["tags"]["api_adresse:geometry:coordinates:lat"] = api_adresse[
"geometry"
]["coordinates"][1]
element["tags"]["api_adresse:properties:label"] = api_adresse["properties"][
"label"
]
element["tags"]["api_adresse:properties:score"] = api_adresse["properties"][
"score"
]
if "housenumber" in api_adresse["properties"]:
element["tags"]["api_adresse:properties:housenumber"] = api_adresse[
"properties"
]["housenumber"]
element["tags"]["api_adresse:properties:type"] = api_adresse["properties"][
"type"
]
element["tags"]["api_adresse:properties:name"] = api_adresse["properties"][
"name"
]
element["tags"]["api_adresse:properties:postcode"] = api_adresse[
"properties"
]["postcode"]
element["tags"]["api_adresse:properties:citycode"] = api_adresse[
"properties"
]["citycode"]
element["tags"]["api_adresse:properties:city"] = api_adresse["properties"][
"city"
]
if "street" in api_adresse["properties"]:
element["tags"]["api_adresse:properties:street"] = api_adresse[
"properties"
]["street"]
element["tags"]["api_adresse:properties:attribution"] = rev_geocode[
"attribution"
]
element["tags"]["api_adresse:properties:licence"] = rev_geocode["licence"]
return data
def traduction(self, tag, dictionnaire, data):
"""Traduit le champ tag des éléments de data avec dict"""
for element in data["elements"]:
if tag in element["tags"]:
element["tags"][tag] = dictionnaire[element["tags"][tag]]
return data
def progress_bar(
# pylint:disable=C0330
self,
iterable,
decimals=1,
length=50,
prefix="",
fill="",
print_end="\r",
):
"""
Call in a loop to create terminal progress bar
@params:
iterable - Required : iterable object (Iterable)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
prefix - Optional : prefix string (Str)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
total = len(iterable)
if total == 0:
return
# Initial Call
print(f"\r{prefix} |{'-' * length}| {0}%", end=print_end)
# Update Progress Bar
for i, item in enumerate(iterable):
yield item
percent = ("{0:." + str(decimals) + "f}").format(
100 * ((i + 1) / float(total))
)
filled = int(length * (i + 1) // total)
progress = fill * filled + "-" * (length - filled)
print(f"\r{prefix} |{progress}| {percent}%", end=print_end)
# Print New Line on Complete
print()