287 lines
15 KiB
Python
287 lines
15 KiB
Python
# encoding: utf-8
|
||
|
||
from datetime import datetime
|
||
from io import StringIO, BytesIO
|
||
import json
|
||
import os
|
||
import tempfile
|
||
import zipfile
|
||
|
||
import click
|
||
import requests
|
||
|
||
from app.model.address import AddressModel
|
||
from app.model.contact import ContactModel
|
||
from app.model.entity import EntityModel
|
||
from app.model.membership import MembershipModel
|
||
from app.model.representative import RepresentativeModel
|
||
from app.model.type import TypeModel
|
||
from command.json_reader import json_reader
|
||
|
||
import json
|
||
|
||
|
||
def import_representatives(filepath):
|
||
click.echo("Importing representatives from parltrack")
|
||
click.echo(" Reading file")
|
||
with open(filepath) as f:
|
||
meps = json.load(f)
|
||
for representative in meps:
|
||
click.echo(".", nl=False)
|
||
|
||
#for representative in json_reader(filepath):
|
||
# click.echo(".", nl=False)
|
||
|
||
|
||
|
||
|
||
def toto():
|
||
# Delete only things related to "Assemblée Nationale" !
|
||
MembershipModel.query.delete() #filter_by(source="Assemblée Nationale").delete()
|
||
RepresentativeModel.query.filter_by(source="Assemblée Nationale").delete()
|
||
AddressModel.query.filter_by(source="Assemblée Nationale").delete()
|
||
ContactModel.query.filter_by(source="Assemblée Nationale").delete()
|
||
EntityModel.query.filter_by(source="Assemblée Nationale").delete()
|
||
TypeModel.query.filter_by(source="Assemblée Nationale").delete()
|
||
|
||
url = "https://data.assemblee-nationale.fr/static/openData/repository/15/amo/tous_acteurs_mandats_organes_xi_legislature/AMO30_tous_acteurs_tous_mandats_tous_organes_historique.json.zip"
|
||
if False:
|
||
datas = BytesIO()
|
||
result = requests.get(url, stream=True)
|
||
datas.write(result.content)
|
||
datas.seek(0)
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
with zipfile.ZipFile(datas, "r") as zip_ref:
|
||
zip_ref.extractall(tmpdir)
|
||
print(tmpdir)
|
||
for root, dirs, files in os.walk(tmpdir):
|
||
if root.endswith("acteur"):
|
||
for filename in files:
|
||
print(os.path.join(root, filename))
|
||
for filename in files[:1]:
|
||
with open(os.path.join(root, filename)) as filehandler:
|
||
data = json.load(filehandler)
|
||
print(json.dumps(data, indent=2))
|
||
# Testing
|
||
tmpdir = "C:\\Users\\tbouchet\\Downloads\\json"
|
||
click.echo(" ", nl=False)
|
||
for root, dirs, files in os.walk(tmpdir):
|
||
if root.endswith("organe"):
|
||
with click.progressbar(files, label="Entities") as progress_files:
|
||
entities = []
|
||
for filename in progress_files:
|
||
#print(filename)
|
||
with open(os.path.join(root, filename)) as filehandler:
|
||
data = json.load(filehandler)["organe"]
|
||
|
||
# Type
|
||
# file:///C:/Users/tbouchet/Downloads/html/Schemas_Entites/AMO/Schemas_Organes.html
|
||
type_types = {
|
||
"API": "Assemblée parlementaire internationale",
|
||
"ASSEMBLEE": "Assemblée nationale",
|
||
"ASSEXT": "Autres conseils",
|
||
"ASSTOM": "Assemblée territoriale d’Outre-Mer",
|
||
"CES": "Conseil économique, social et environnemental",
|
||
"CJR": "Cour de justice de la République",
|
||
"CMP": "Commissions mixtes paritaires",
|
||
"CNPE": "Commissions d’enquêtes",
|
||
"CNPS": "Commissions spéciales",
|
||
"COMMUNE": "Conseil Municipal",
|
||
"COMNL": "Autres commissions permanentes",
|
||
"COMPER": "Commissions permanentes législatives",
|
||
"COMSENAT": "Commissions du Sénat",
|
||
"COMSPSENAT": "Commissions spéciales du Sénat",
|
||
"CONFPT": "CONFPT",
|
||
"CONSTITU": "Conseil constitutionnel",
|
||
"DELEG": "Délégation parlementaire",
|
||
"DELEGBUREAU": "Délégation du Bureau (de l’AN)",
|
||
"DELEGSENAT": "Délégation du Sénat",
|
||
"DEPARTEMENT": "Conseil général ou départemental",
|
||
"EUROPE": "Mandat européen",
|
||
"GA": "Groupe d’amitié",
|
||
"GE": "Groupe d’études",
|
||
"GEVI": "Groupe d’études à vocation internationale",
|
||
"GOUVERNEMENT": "Gouvernement",
|
||
"GP": "Groupe politique",
|
||
"GROUPESENAT": "Groupe Sénat",
|
||
"HCJ": "Haute Cour de justice",
|
||
"INTCO": "Intercommunalité",
|
||
"MINISTERE": "Ministère",
|
||
"MISINFO": "Missions d’informations",
|
||
"MISINFOCOM": "Missions d’information communes",
|
||
"MISINFOPRE": "Missions d’information de la conférence des Présidents",
|
||
"OFFPAR": "Office parlementaire ou délégation mixte",
|
||
"ORGAINT": "Organisme international",
|
||
"ORGEXTPARL": "Organisme extra parlementaire",
|
||
"PARPOL": "Parti Politique",
|
||
"PRESREP": "Présidence de la République",
|
||
"REGION": "Conseil régional",
|
||
"SENAT": "Mandat de sénateur",
|
||
}
|
||
type = TypeModel.query.filter_by(name = type_types[data["codeType"]]).first()
|
||
if type is None:
|
||
type = TypeModel()
|
||
type.source = "Assemblée Nationale"
|
||
type.source_uid = data["codeType"]
|
||
type.name = type_types[data["codeType"]]
|
||
type.save()
|
||
|
||
# Entity
|
||
entity = EntityModel(
|
||
source = "Assemblée Nationale",
|
||
source_uid = data["uid"],
|
||
type_id = type.id,
|
||
name = data["libelle"],
|
||
code = data["libelleAbrev"],
|
||
country_id = country.id,
|
||
)
|
||
if data["organeParent"] is not None:
|
||
parent = EntityModel.query.filter_by(source_uid=data["organeParent"]).first()
|
||
if parent is not None:
|
||
entity.parent_id = parent.id
|
||
else:
|
||
print(data["uid"], data["organeParent"])
|
||
entity.save()
|
||
|
||
for root, dirs, files in os.walk(tmpdir):
|
||
if root.endswith("acteur"):
|
||
with click.progressbar(files, label="Representatives") as progress_files:
|
||
for filename in progress_files:
|
||
with open(os.path.join(root, filename)) as filehandler:
|
||
data = json.load(filehandler)["acteur"]
|
||
|
||
# Representative
|
||
representative = RepresentativeModel()
|
||
representative.source = "Assemblée Nationale"
|
||
representative.source_uid = data["uid"]["#text"]
|
||
nom = data["etatCivil"]["ident"]["nom"]
|
||
prenom = data["etatCivil"]["ident"]["prenom"]
|
||
representative.name = f"{prenom} {nom}"
|
||
representative.nationality_id = country.id
|
||
representative.birth_date = datetime.strptime(
|
||
data["etatCivil"]["infoNaissance"]["dateNais"], "%Y-%m-%d"
|
||
)
|
||
if isinstance(data["etatCivil"]["infoNaissance"]["villeNais"], str):
|
||
representative.birth_place = data["etatCivil"]["infoNaissance"][
|
||
"villeNais"
|
||
]
|
||
if isinstance(data["profession"]["libelleCourant"], str):
|
||
representative.profession = data["profession"]["libelleCourant"]
|
||
representative.save()
|
||
|
||
# Address
|
||
if data["adresses"].get("adresse", "") != "":
|
||
address_types = {
|
||
"0": "Parliament address",
|
||
"1": "Address",
|
||
"2": "Constituency address",
|
||
}
|
||
|
||
def manage_address(data_address):
|
||
if data_address["type"] in address_types:
|
||
address = AddressModel()
|
||
address.representative_id = representative.id
|
||
address.source = "Assemblée Nationale"
|
||
address.source_uid = data_address["uid"]
|
||
address.name = address_types[data_address["type"]]
|
||
address.country_id = country.id
|
||
address.number = data_address["numeroRue"]
|
||
address.street = data_address["nomRue"]
|
||
address.miscellaneous = data_address[
|
||
"complementAdresse"
|
||
]
|
||
address.city = data_address["ville"]
|
||
address.zipcode = data_address["codePostal"]
|
||
address.save()
|
||
|
||
if isinstance(data["adresses"]["adresse"], list):
|
||
for data_address in data["adresses"]["adresse"]:
|
||
manage_address(data_address)
|
||
elif isinstance(data["adresses"]["adresse"], dict):
|
||
manage_address(data["adresses"]["adresse"])
|
||
|
||
# Contact
|
||
contact_types = {
|
||
"3": "Phone (Press contact)",
|
||
"11": "Phone",
|
||
"12": "Fax",
|
||
"15": "Email",
|
||
"22": "Website",
|
||
"23": "Senate URL",
|
||
"24": "Twitter",
|
||
"25": "Facebook",
|
||
}
|
||
|
||
def manage_contact(data_contact):
|
||
if data_contact["type"] in contact_types:
|
||
contact = ContactModel()
|
||
contact.representative_id = representative.id
|
||
contact.source = "Assemblée Nationale"
|
||
contact.source_uid = data_contact["uid"]
|
||
if data_contact["adresseDeRattachement"] is not None:
|
||
address = AddressModel.query.filter_by(
|
||
source_uid=data_contact["adresseDeRattachement"]
|
||
).first()
|
||
if address is not None:
|
||
contact.address_id = address.id
|
||
contact.name = contact_types[data_contact["type"]]
|
||
contact.value = data_contact["valElec"]
|
||
contact.save()
|
||
|
||
if isinstance(data["adresses"]["adresse"], list):
|
||
for data_contact in data["adresses"]["adresse"]:
|
||
manage_contact(data_contact)
|
||
elif isinstance(data["adresses"]["adresse"], dict):
|
||
manage_contact(data["adresses"]["adresse"])
|
||
|
||
# Unknown addresses ?
|
||
if isinstance(data["adresses"]["adresse"], list):
|
||
for data_address in data["adresses"]["adresse"]:
|
||
if data_address["type"] not in dict(
|
||
address_types, **contact_types
|
||
):
|
||
print(
|
||
f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}"
|
||
)
|
||
elif isinstance(data["adresses"]["adresse"], dict):
|
||
data_address = data["adresses"]["adresse"]
|
||
if data_address["type"] not in dict(
|
||
address_types, **contact_types
|
||
):
|
||
print(
|
||
f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}"
|
||
)
|
||
|
||
if data["mandats"].get("mandat", "") != "":
|
||
# Membership
|
||
membership_types = {
|
||
"Membre": "Member",
|
||
}
|
||
|
||
def manage_membership(data_membership):
|
||
if data_membership["infosQualite"]["codeQualite"] in membership_types:
|
||
entity = EntityModel.query.filter_by(source_uid=data_membership["organes"]["organeRef"]).first()
|
||
if entity is None:
|
||
print("Organe inconnu", data_membership["organes"]["organeRef"])
|
||
return
|
||
membership = MembershipModel()
|
||
membership.representative_id = representative.id
|
||
membership.role = membership_types[data_membership["infosQualite"]["codeQualite"]]
|
||
membership.country_id = country.id
|
||
if data_membership["dateDebut"] is not None:
|
||
membership.start = datetime.strptime(
|
||
data_membership["dateDebut"], "%Y-%m-%d"
|
||
)
|
||
if data_membership["dateFin"] is not None:
|
||
membership.end = datetime.strptime(
|
||
data_membership["dateFin"], "%Y-%m-%d"
|
||
)
|
||
membership.entity_id = entity.id
|
||
membership.save()
|
||
|
||
if isinstance(data["mandats"]["mandat"], list):
|
||
for data_membership in data["mandats"]["mandat"]:
|
||
manage_membership(data_membership)
|
||
elif isinstance(data["mandats"]["mandat"], dict):
|
||
manage_membership(data["mandats"]["mandat"])
|