288 lines
8.7 KiB
Python
288 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# inspiration :
|
|
# https://towardsdatascience.com/loading-data-from-openstreetmap-with-python-and-the-overpass-api-513882a27fd0
|
|
# https://geo.api.gouv.fr/adresse
|
|
# https://wiki.cartocite.fr/doku.php?id=umap:10_-_je_valorise_les_donnees_openstreetmap_avec_umap
|
|
# https://sites-formations.univ-rennes2.fr/mastersigat/Cours/Intro_Overpass.pdf
|
|
|
|
# usage des tags : https://taginfo.openstreetmap.org/tags/?key=amenity&value=bicycle_parking#combinations
|
|
|
|
# exemple URL données pour umap : https://www.velocite63.fr/velocite63/OSM/stationnements_velos_publics.json
|
|
# penser à cocher "proxy" dans la rubrique "données distantes" du calque
|
|
|
|
# export ODS :
|
|
# https://pythonhosted.org/pyexcel-ods/
|
|
# pip3 install pyexcel-ods3
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
from pyexcel_ods3 import save_data
|
|
from collections import OrderedDict
|
|
import os
|
|
from osm_vc63 import errors
|
|
from osm_vc63 import requetes
|
|
|
|
overpass_url="http://overpass-api.de/api/interpreter"
|
|
geo_api_url = "https://api-adresse.data.gouv.fr"
|
|
dossier_sauvegarde = "resultats/"
|
|
|
|
# nombre maxi de retries quand echec API
|
|
max_retry = 4
|
|
|
|
# delai en secondes entre les tentatives
|
|
retry_delay = 120
|
|
|
|
|
|
# id du département "Puy de Dôme" : 7406
|
|
# id Riom : 1693144
|
|
# id Clermont : 110866
|
|
# id Romagnat : 138269
|
|
|
|
# l'id de l'area se calcule en ajoutant 3600000000 au numéro de l'objet OSM
|
|
aire_de_recherche = str(3600000000+110866)
|
|
|
|
|
|
|
|
# ----------------------------------------------
|
|
|
|
trad_bicycle_parking = {
|
|
"stands": "Arceaux",
|
|
"wall_loops": "Pince roues",
|
|
"rack": "Râteliers",
|
|
"anchors": "Ancrage",
|
|
"shed": "Abri collectif",
|
|
"bollard": "Potelet",
|
|
"lockers": "Abris individuels",
|
|
"wide_stands": "Arceaux espacés",
|
|
"ground_slots": "Fente dans le sol",
|
|
"building": "Bâtiment",
|
|
"informal": "Informel",
|
|
"wave": "Râteliers",
|
|
"streetpod": "Arceaux",
|
|
"tree": "Arbre à bicyclettes",
|
|
"crossbar": "Barre",
|
|
"rope": "Câble",
|
|
"two-tier": "Deux étages",
|
|
"floor": "Sol",
|
|
"handlebar_holder": "Accroche-guidons"}
|
|
|
|
# ----------------------------------------------
|
|
|
|
def run_overpass_query(query) :
|
|
|
|
response = requests.get(overpass_url, params={'data': query})
|
|
|
|
if (response.status_code != 200) :
|
|
raise errors.Overpass_error(response.status_code)
|
|
|
|
return (response.json())
|
|
|
|
|
|
def run_reverse_geocoding(lat, lon) :
|
|
|
|
url = geo_api_url + "/reverse/"
|
|
|
|
response = requests.get(url, params={'lon' : str(lon), 'lat' : str(lat)})
|
|
|
|
if (response.status_code != 200) :
|
|
raise errors.Geo_api_error(response.status_code)
|
|
|
|
return (response.json())
|
|
|
|
# ----------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def executer_requete_et_exporter_resultats(nom_req, critere, aire_de_recherche, overpass_query_fields) :
|
|
|
|
print ("Nom requête : "+nom_req)
|
|
|
|
overpass_query = """[out:json];
|
|
(
|
|
"""+critere+"""
|
|
);
|
|
out center;
|
|
"""
|
|
|
|
overpass_query = overpass_query.replace("aire_de_recherche", aire_de_recherche)
|
|
|
|
|
|
print("Execution requete overpass : \n"+overpass_query)
|
|
|
|
data = run_overpass_query(overpass_query)
|
|
|
|
nb_elements = len(data["elements"])
|
|
|
|
print("Nombre d'elements : "+str(nb_elements))
|
|
|
|
print("Géocodage inversé : ", end="", flush=True)
|
|
|
|
# @TODO : optimiser en faisant un appel au service /reverse/csv/ plutot que le service unitaire /reverse/
|
|
|
|
for element in data["elements"]:
|
|
|
|
if (element["type"] == "node") :
|
|
rev_geocode = run_reverse_geocoding(element["lat"], element["lon"])
|
|
else :
|
|
rev_geocode = run_reverse_geocoding(element["center"]["lat"], element["center"]["lon"])
|
|
|
|
api_adresse = rev_geocode["features"][0]
|
|
|
|
element["tags"]["api_adresse:geometry:coordinates:lon"] = api_adresse["geometry"]["coordinates"][0]
|
|
element["tags"]["api_adresse:geometry:coordinates:lat"] = api_adresse["geometry"]["coordinates"][1]
|
|
|
|
element["tags"]["api_adresse:properties:label"] = api_adresse["properties"]["label"]
|
|
element["tags"]["api_adresse:properties:score"] = api_adresse["properties"]["score"]
|
|
|
|
if ("housenumber" in api_adresse["properties"]) :
|
|
element["tags"]["api_adresse:properties:housenumber"] = api_adresse["properties"]["housenumber"]
|
|
|
|
element["tags"]["api_adresse:properties:type"] = api_adresse["properties"]["type"]
|
|
|
|
element["tags"]["api_adresse:properties:name"] = api_adresse["properties"]["name"]
|
|
element["tags"]["api_adresse:properties:postcode"] = api_adresse["properties"]["postcode"]
|
|
element["tags"]["api_adresse:properties:citycode"] = api_adresse["properties"]["citycode"]
|
|
element["tags"]["api_adresse:properties:city"] = api_adresse["properties"]["city"]
|
|
|
|
if ("street" in api_adresse["properties"]) :
|
|
element["tags"]["api_adresse:properties:street"] = api_adresse["properties"]["street"]
|
|
|
|
element["tags"]["api_adresse:properties:attribution"] = rev_geocode["attribution"]
|
|
element["tags"]["api_adresse:properties:licence"] = rev_geocode["licence"]
|
|
|
|
|
|
# traduction
|
|
if "bicycle_parking" in element["tags"]:
|
|
element["tags"]["bicycle_parking"] = trad_bicycle_parking[element["tags"]["bicycle_parking"]]
|
|
|
|
print("X", end="", flush=True)
|
|
|
|
#else :
|
|
# print("-", end="", flush=True)
|
|
|
|
print()
|
|
|
|
print("Sauvegarde résultat format JSON/OSM")
|
|
|
|
export_json = {"version": data["version"],
|
|
"generator" : data["generator"] + " and ETALAB API",
|
|
"osm3s" : data["osm3s"],
|
|
"elements": []
|
|
}
|
|
|
|
index_line = 0
|
|
|
|
# on refait un JSON allégé juste avec les données qu'on va afficher sur la carte UMAP
|
|
|
|
for element in data["elements"]:
|
|
|
|
export_json["elements"].append({"type" : element["type"],
|
|
"id" : element["id"]})
|
|
|
|
if (element["type"] == "node") :
|
|
export_json["elements"][index_line]["lat"] = element["lat"]
|
|
export_json["elements"][index_line]["lon"] = element["lon"]
|
|
else :
|
|
export_json["elements"][index_line]["center"] = element["center"]
|
|
export_json["elements"][index_line]["nodes"] = element["nodes"]
|
|
|
|
#export_json["elements"][index_line]["tags"] = element["tags"]
|
|
|
|
description = ""
|
|
|
|
for tag in overpass_query_fields.keys() :
|
|
if overpass_query_fields[tag]["export_json"] == "Oui" :
|
|
if tag in element["tags"] :
|
|
if overpass_query_fields[tag]["FR"] != "" :
|
|
description = description + overpass_query_fields[tag]["FR"] + " : "
|
|
|
|
description = description + str(element["tags"][tag]) + "\n"
|
|
|
|
|
|
export_json["elements"][index_line]["tags"] = {"description": description}
|
|
|
|
|
|
index_line = index_line + 1
|
|
|
|
|
|
# print (json.dumps(export_json))
|
|
os.makedirs(dossier_sauvegarde, exist_ok = True)
|
|
|
|
jsonFile = open(dossier_sauvegarde + nom_req+".json", "w")
|
|
jsonFile.write(json.dumps(export_json))
|
|
jsonFile.close()
|
|
|
|
|
|
# ===========================================
|
|
|
|
sauvegarde_ods(overpass_query_fields, data, nom_req)
|
|
|
|
|
|
def sauvegarde_ods(overpass_query_fields, data, nom_req):
|
|
ODSdataSheet = OrderedDict()
|
|
|
|
ODSdata = []
|
|
|
|
ODSdata.append(overpass_query_fields.keys())
|
|
|
|
index_line = 2
|
|
|
|
for element in data["elements"]:
|
|
|
|
line = []
|
|
|
|
index_col = 0
|
|
|
|
# if (element["type"] == "node") :
|
|
for field in overpass_query_fields.keys():
|
|
if field in element["tags"]:
|
|
if field == "capacity":
|
|
val = element["tags"][field]
|
|
line.append(int(val) if val.isdigit() else val)
|
|
else:
|
|
line.append(element["tags"][field])
|
|
else:
|
|
line.append("")
|
|
index_col = index_col + 1
|
|
|
|
ODSdata.append(line)
|
|
index_line = index_line + 1
|
|
|
|
ODSdataSheet.update({"resultats": ODSdata})
|
|
|
|
save_data(dossier_sauvegarde + nom_req + ".ods", ODSdataSheet)
|
|
|
|
print("Sauvegarde résultats format ODS pour " + nom_req)
|
|
|
|
|
|
for req in requetes.reqs :
|
|
|
|
for nb_essai in range(max_retry) :
|
|
# on tente max_retry fois
|
|
|
|
try :
|
|
|
|
executer_requete_et_exporter_resultats(req.nom, req.critere, aire_de_recherche, req.champs)
|
|
|
|
break
|
|
|
|
except errors.Api_error :
|
|
|
|
if (nb_essai == max_retry) :
|
|
print ("trop d'erreurs d'API - abandon")
|
|
exit()
|
|
|
|
print ("erreur API - on retente dans "+str(retry_delay)+"s")
|
|
|
|
time.sleep(retry_delay)
|
|
|
|
|
|
|
|
print("Fini")
|