RDOO/recup_donnees_osm_overpass.py

145 lines
4.1 KiB
Python
Raw Normal View History

2021-09-26 12:52:37 +02:00
#!/usr/bin/env python3
2021-10-21 22:27:19 +02:00
# Copyright 2021 Olav63, SebF
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2021-10-10 16:50:27 +02:00
"""
Module principal : 
- récupération de données par appel à Overpass
- géocodage inverse
- export des données en JSON pour utilisation avec umap
- sauvegarde des données en ods
"""
2021-09-26 12:52:37 +02:00
import time
import os
import argparse
from osm_vc63 import errors
from osm_vc63 import requetes
from osm_vc63.utils import Utils
2021-09-26 12:52:37 +02:00
2021-10-10 16:50:27 +02:00
OVERPASS_URL = "http://overpass-api.de/api/interpreter"
GEO_API_URL = "https://api-adresse.data.gouv.fr"
DOSSIER_SAUVEGARDE = "resultats/"
2021-09-26 12:52:37 +02:00
# nombre maxi de retries quand echec API
2021-10-10 16:50:27 +02:00
MAX_RETRY = 4
2021-09-26 12:52:37 +02:00
# delai en secondes entre les tentatives
2021-10-10 16:50:27 +02:00
RETRY_DELAY = 120
2021-09-26 12:52:37 +02:00
# traductions des tags bicycle_parking
TRAD_BICYCLE_PARKING = {
2021-09-26 12:52:37 +02:00
"stands": "Arceaux",
"wall_loops": "Pince roues",
"rack": "Râteliers",
"anchors": "Ancrage",
"shed": "Abri collectif",
"bollard": "Potelet",
"lockers": "Abris individuels",
"wide_stands": "Arceaux espacés",
"ground_slots": "Fente dans le sol",
"building": "Bâtiment",
"informal": "Informel",
"wave": "Râteliers",
"streetpod": "Arceaux",
"tree": "Arbre à bicyclettes",
"crossbar": "Barre",
"rope": "Câble",
"two-tier": "Deux étages",
"floor": "Sol",
2021-10-10 10:56:13 +02:00
"handlebar_holder": "Accroche-guidons",
}
2021-10-09 15:10:36 +02:00
2021-09-26 12:52:37 +02:00
def init_argparse() -> argparse.ArgumentParser:
"""Définition des arguments possibles."""
parser = argparse.ArgumentParser(
usage="%(prog)s [OPTIONS] ...",
description="Exporte les données de la Cyclosphère d'une zone géographique.",
)
parser.add_argument(
"-z",
"--zone",
type=int,
help="Choisir la zone géographique à inspecter. ",
default=7406,
)
return parser
def main():
"""Routine principale"""
2021-09-26 12:52:37 +02:00
parser = init_argparse()
args = parser.parse_args()
# l'id de l'area se calcule en ajoutant 3600000000 au numéro de l'objet OSM
aire_de_recherche = str(3_600_000_000 + args.zone)
for req in requetes.REQS:
for nb_essai in range(MAX_RETRY): # on tente max_retry fois
try:
utils = Utils(OVERPASS_URL, GEO_API_URL, DOSSIER_SAUVEGARDE)
2021-09-26 12:52:37 +02:00
2021-10-10 18:54:19 +02:00
print(f"{75*'#'}\r\nRequête en cours : {req.nom}")
# appel overpass
data = utils.run_overpass_query(req.critere, aire_de_recherche)
nb_resultats = len(data["elements"])
print(f"{nb_resultats} résultats")
2021-09-26 12:52:37 +02:00
if nb_resultats > 0:
# géocodage inverse
data = utils.geocodage(data)
2021-09-26 12:52:37 +02:00
# traduction
data = utils.traduction(
"bicycle_parking", TRAD_BICYCLE_PARKING, data
)
2021-09-26 12:52:37 +02:00
# Sauvegarde
os.makedirs(DOSSIER_SAUVEGARDE, exist_ok=True)
export_json = utils.nettoyage_json_pour_umap(data, req.champs)
2021-09-26 12:52:37 +02:00
utils.save_as_json(export_json, req.nom)
utils.save_as_ods(req.champs, data, req.nom)
2021-09-26 12:52:37 +02:00
2021-10-10 19:53:43 +02:00
# doucement sur overpass
time.sleep(30)
2021-10-09 14:31:01 +02:00
break
2021-10-10 16:31:54 +02:00
except errors.ApiError:
2021-10-10 10:56:13 +02:00
2021-10-10 16:50:27 +02:00
if nb_essai == MAX_RETRY:
2021-10-09 14:31:01 +02:00
print("trop d'erreurs d'API - abandon")
2021-09-26 12:52:37 +02:00
exit()
2021-10-10 10:56:13 +02:00
2021-10-10 16:50:27 +02:00
print("erreur API - on retente dans " + str(RETRY_DELAY) + "s")
2021-10-10 10:56:13 +02:00
2021-10-10 16:50:27 +02:00
time.sleep(RETRY_DELAY)
2021-10-10 10:56:13 +02:00
2021-10-10 19:53:43 +02:00
print("\r\n ### Terminé ###")
2021-09-26 12:52:37 +02:00
2021-10-09 14:31:01 +02:00
if __name__ == "__main__":
main()