commit cc37b02aafed1fea5401dee8d82625067377daf5 Author: Thomas BOUCHET Date: Fri Jul 23 17:21:38 2021 +0200 First commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c036379 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +tmp/ \ No newline at end of file diff --git a/assemblee_nationale/load_datas.py b/assemblee_nationale/load_datas.py new file mode 100644 index 0000000..0e96abc --- /dev/null +++ b/assemblee_nationale/load_datas.py @@ -0,0 +1,35 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import os +import shutil + +import requests + +url = "https://data.assemblee-nationale.fr/static/openData/repository/15/amo/tous_acteurs_mandats_organes_xi_legislature/AMO30_tous_acteurs_tous_mandats_tous_organes_historique.json.zip" + +# Cleaning old data +try: + os.remove("../tmp/assemblee_nationale.zip") +except FileNotFoundError: + # No file to remove + pass +try: + shutil.rmtree("../tmp/json") +except FileNotFoundError: + # No folder to remove + pass + +# Download and extract data +print("Downloading archive") +with requests.get(url, stream=True) as result: + result.raise_for_status() + with open("../tmp/assemblee_nationale.zip", "wb") as f: + for chunk in result.iter_content(chunk_size=8192): + f.write(chunk) +print("Unpacking archive") +shutil.unpack_archive("../tmp/assemblee_nationale.zip", "../tmp/") + +os.remove("../tmp/assemblee_nationale.zip") diff --git a/assemblee_nationale/schemas.txt b/assemblee_nationale/schemas.txt new file mode 100644 index 0000000..ffc1775 --- /dev/null +++ b/assemblee_nationale/schemas.txt @@ -0,0 +1 @@ +https://www.assemblee-nationale.fr/opendata/Schemas_Entites/AMO/Schemas_Organes.html diff --git a/assemblee_nationale/scrap_entities.py b/assemblee_nationale/scrap_entities.py new file mode 100644 index 0000000..954f0fb --- /dev/null +++ b/assemblee_nationale/scrap_entities.py @@ -0,0 +1,36 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import csv +from datetime import datetime +import json +import os + +# Extract representatives +print("Scraping entities") +with open("../tmp/assemblee_nationale_entities.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["type_code", "country_iso2", "name", "code", "picture", "start", "end"]) + + for filename in os.listdir("../tmp/json/organe"): + # Loading informations + with open(os.path.join("../tmp/json/organe", filename)) as file_handler: + organe = json.load(file_handler)["organe"] + type_raw = organe["codeType"] + name = organe["libelle"] + code = organe["uid"] + parent = organe["organeParent"] + start = organe["viMoDe"].get("dateDebut", organe["viMoDe"].get("dateAgrement", None)) + end = organe["viMoDe"].get("dateFin", None) + # CSV line + writer.writerow([ + type_raw, + "FR", + name, + code, + "", + start, + end, + ]) diff --git a/assemblee_nationale/scrap_memberships.py b/assemblee_nationale/scrap_memberships.py new file mode 100644 index 0000000..343e4fe --- /dev/null +++ b/assemblee_nationale/scrap_memberships.py @@ -0,0 +1,43 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import csv +from datetime import datetime +import json +import os + +from slugify import slugify + +# Extract representatives +print("Scraping memberships") +with open("../tmp/assemblee_nationale_memberships.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["representative_slug", "role_code", "entity_code", "start", "end"]) + + for filename in os.listdir("../tmp/json/acteur"): + # Loading informations + with open(os.path.join("../tmp/json/acteur", filename)) as file_handler: + acteur = json.load(file_handler)["acteur"] + identity = acteur["etatCivil"]["ident"] + representative_slug = slugify(f"{identity['prenom']} {identity['nom']}") + mandats = acteur["mandats"]["mandat"] + if isinstance(mandats, dict): + mandats = [mandats] + for mandat in mandats: + role_code = mandat["infosQualite"].get("codeQualite", "") + start = mandat.get("dateDebut", None) + end = mandat.get("dateFin", None) + organes = mandat["organes"]["organeRef"] + if isinstance(organes, str): + organes = [organes] + for entity_code in organes: + # CSV line + writer.writerow([ + representative_slug, + role_code, + entity_code, + start, + end, + ]) diff --git a/assemblee_nationale/scrap_representatives.py b/assemblee_nationale/scrap_representatives.py new file mode 100644 index 0000000..06e401f --- /dev/null +++ b/assemblee_nationale/scrap_representatives.py @@ -0,0 +1,63 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import csv +import json +import os +import sys + +# Extract representatives +with open("../tmp/assemblee_nationale_representatives.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["code", "name", "picture", "nationality", "sex", "birth_date", "birth_place", "job"]) + + for filename in os.listdir("../tmp/json/acteur"): + print(".", end="") + sys.stdout.flush() + # Loading informations + with open(os.path.join("../tmp/json/acteur", filename)) as file_handler: + acteur = json.load(file_handler)["acteur"] + uid = f"AN_{acteur['uid']['#text'][2:]}" + # Identity + identity = acteur["etatCivil"]["ident"] + if identity["civ"] == "M.": + sex = "M" + else: + sex = "F" + fullname = f"{identity['prenom']} {identity['nom']}" + birth = acteur["etatCivil"]["infoNaissance"] + birth_date = birth["dateNais"] + if isinstance(birth["villeNais"], dict): + birth_city = None + else: + birth_city = birth["villeNais"] + if isinstance(acteur["profession"]["libelleCourant"], dict): + job = None + else: + job = acteur["profession"]["libelleCourant"] + # Picture + legislatures = [] + mandats = acteur["mandats"]["mandat"] + if isinstance(mandats, dict): + mandats = [mandats] + for mandat in mandats: + if mandat["legislature"] is not None: + legislatures.append(int(mandat["legislature"])) + part_1 = "https://www2.assemblee-nationale.fr/static/tribun/" + if len(legislatures) == 0: + picture = f"" + else: + picture = f"{part_1}{max(legislatures)}/photos/{uid[3:]}.jpg" + # CSV line + writer.writerow([ + uid, + fullname, + picture, + "FR", + sex, + birth_date, + birth_city, + job, + ]) diff --git a/assemblee_nationale/scrap_roles.py b/assemblee_nationale/scrap_roles.py new file mode 100644 index 0000000..782ccce --- /dev/null +++ b/assemblee_nationale/scrap_roles.py @@ -0,0 +1,36 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import csv +import json +import os +import sys + +# Extract roles +print("Scraping roles") +with open("../tmp/assemblee_nationale_roles.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["code", "name"]) + + roles = [] + for filename in os.listdir("../tmp/json/acteur"): + print(".", end="") + sys.stdout.flush() + # Loading informations + with open(os.path.join("../tmp/json/acteur", filename)) as file_handler: + acteur = json.load(file_handler)["acteur"] + mandats = acteur["mandats"]["mandat"] + if isinstance(mandats, dict): + mandats = [mandats] + for mandat in mandats: + role = mandat["infosQualite"].get("codeQualite", "") + if role not in roles and role is not None: + roles.append(role) + for role in roles: + # CSV line + writer.writerow([ + role, + role, + ]) diff --git a/assemblee_nationale/scrap_types.py b/assemblee_nationale/scrap_types.py new file mode 100644 index 0000000..5f98744 --- /dev/null +++ b/assemblee_nationale/scrap_types.py @@ -0,0 +1,66 @@ +# encoding: utf-8 +""" +Tool used to upload representatives from French National Assembly. +""" + +import csv +import json +import os +import sys + +TYPES = { + "API": "Assemblée parlementaire internationale", + "ASSEMBLEE": "Assemblée Nationale", + "CJR": "Cour de Justice de la République", + "CMP": "Commission mixte paritaire", + "CNPE": "Commission d’enquête", + "CNPS": "Commission spéciale", + "COMNL": "Autre commission permanente", + "COMPER": "Commission permanente législative", + "COMSENAT": "Commission sénatoriale", + "COMSPSENAT": "Commission spéciale sénatoriale", + "CONFPT": "Conférence des Présidents", + "CONSTITU": "Conseil constitutionnel", + "DELEG": "Délégation parlementaire", + "DELEGBUREAU": "Délégation du Bureau de l'Assemblée Nationale", + "DELEGSENAT": "Délégation sénatoriale", + "GA": "Groupe d'amitié", + "GE": "Groupe d'études", + "GEVI": "Groupe d’études à vocation internationale", + "GOUVERNEMENT": "Gouvernement", + "GP": "Groupe politique", + "GROUPESENAT": "Groupe sénatorial", + "HCJ": "Haut Cour de Justice", + "MINISTERE": "Ministère", + "MISINFO": "Mission d’information", + "MISINFOCOM": "Mission d’information commune", + "MISINFOPRE": "Missions d’information de la conférence des Présidents", + "OFFPAR": "Office parlementaire ou délégation mixte", + "ORGEXTPARL": "Organisme extra parlementaire", + "PARPOL": "Parti politique", + "PRESREP": "Présidence de la République", + "SENAT": "Sénat", +} + + +# Extract types +print("Scraping types") +with open("../tmp/assemblee_nationale_types.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["code", "name"]) + + types = {} + for filename in os.listdir("../tmp/json/organe"): + print(".", end="") + sys.stdout.flush() + # Loading informations + with open(os.path.join("../tmp/json/organe", filename)) as file_handler: + organe = json.load(file_handler)["organe"] + if organe["codeType"].upper() not in types: + types[organe["codeType"].upper()] = TYPES.get(organe["codeType"].upper(), organe["codeType"].upper()) + for type_code in types: + # CSV line + writer.writerow([ + type_code, + types[type_code], + ]) diff --git a/lqdn/list_deputes.py b/lqdn/list_deputes.py new file mode 100644 index 0000000..8950a3a --- /dev/null +++ b/lqdn/list_deputes.py @@ -0,0 +1,30 @@ +# encoding: utf-8 + +from string import ascii_uppercase +from time import sleep +import sys + +from bs4 import BeautifulSoup +import requests + +url = "https://wiki.laquadrature.net/index.php?title=Cat%C3%A9gorie:D%C3%A9put%C3%A9s&pagefrom=" + +deputes = [] +for letter in ascii_uppercase: + # Do not DDOS lqdn wiki ;o) + sleep(.2) + content = requests.get(f"{url}{letter}").text + soup = BeautifulSoup(content, features="lxml") + + anchors = soup.find_all("a") + print(letter, end="") + for anchor in anchors: + if anchor.text == anchor.get("title") and not anchor.text.startswith("Deputes"): + deputes.append(anchor.text) + print(".", end="") + sys.stdout.flush() + print() + +with open("../tmp/liste_deputes.txt", "w", encoding="utf-8") as file_handler: + for depute in sorted(list(set(deputes))): + file_handler.write(f"{depute}\n") diff --git a/lqdn/scrap_representatives.py b/lqdn/scrap_representatives.py new file mode 100644 index 0000000..6265c28 --- /dev/null +++ b/lqdn/scrap_representatives.py @@ -0,0 +1,61 @@ +# encoding: utf-8 + +import csv +from datetime import datetime +import locale +import re +from string import ascii_uppercase +from time import sleep +import sys + +from bs4 import BeautifulSoup +import requests +from slugify import slugify + +url = "https://wiki.laquadrature.net/index.php?title=Cat%C3%A9gorie:D%C3%A9put%C3%A9s&pagefrom=" +locale.setlocale(locale.LC_ALL, "FR") + +deputes = [] +with open("../tmp/liste_deputes.txt", encoding="utf-8") as file_handler: + deputes = file_handler.read().splitlines() + +# Extract representatives +with open("../tmp/lqdn_representatives.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["name", "picture", "nationality", "sex", "birth_date", "birth_place", "job"]) + + for depute in deputes: + print(".", end="") + sys.stdout.flush() + # Loading informations + content = requests.get(f"https://wiki.laquadrature.net/{depute}").text + soup = BeautifulSoup(content, features="lxml") + deputy = soup.find("span", attrs={"class": "mw-headline"}) + # Identity + fullname = deputy.text.split(":")[1].split(",")[0].strip() + if "Né le" in content: + sex = "M" + else: + sex = "F" + birth = soup.find(text=re.compile("Née? le")).parent.parent + birth_date = birth.contents[1].strip() + birth_date = datetime.strptime(birth_date, "%d %B %Y").strftime("%Y-%m-%d") + birth_city = birth.contents[3].strip("(").split()[0].strip() + try: + job = soup.find(text=re.compile("Profession")).parent.parent + job_name = job.contents[1].split(":")[1].strip() + except: + job_name = "" + # Picture + picture = soup.find("img", attrs={"alt": fullname})["src"] + picture = f"https://wiki.laquadrature.net{picture}" + # CSV line + writer.writerow([ + fullname, + picture, + "FR", + sex, + birth_date, + birth_city, + job_name, + ]) diff --git a/lqdn/scrap_stances.py b/lqdn/scrap_stances.py new file mode 100644 index 0000000..2c71ffc --- /dev/null +++ b/lqdn/scrap_stances.py @@ -0,0 +1,62 @@ +# encoding: utf-8 + +from datetime import datetime +from string import ascii_uppercase +from time import sleep +import sys + +from bs4 import BeautifulSoup +import csv +import requests +from slugify import slugify + +with open("../tmp/liste_deputes.txt") as file_handler: + deputes = file_handler.read().splitlines() + +with open("../tmp/lqdn_stances.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["name", "slug", "matter", "subject", "date", "extract", "source_url"]) + + for depute in deputes: + # Do not DDOS lqdn wiki ;o) + sleep(.2) + content = requests.get(f"https://wiki.laquadrature.net/{depute}").text + soup = BeautifulSoup(content, features="lxml") + deputy = soup.find("span", attrs={"class": "mw-headline"}) + if deputy is not None: + stance_author = deputy.text.split(",")[0].split(":")[1].strip() + else: + stance_author = depute + quotes = soup.find_all("h5") + for quote in quotes: + try: + stance_date = datetime.strptime(quote.text.split()[0], "%d/%m/%Y") + stance_subject = " ".join(quote.text.split()[1:]) + except: + stance_date = None + stance_subject = quote.text + # TODO: Set the matter accordingly to the subject + stance_matter = stance_subject.split(":")[0] + if quote.find("a"): + stance_link = quote.find("a").get("href") + else: + stance_link = None + # quote + quotes = [] + block = quote.find_next_sibling() + if block is not None: + while block is not None and block.name == "blockquote": + quotes.append(block.text) + block = block.find_next_sibling() + stance_quote = "\n".join(quotes) + writer.writerow([ + stance_author, + slugify(stance_author), + stance_matter, + stance_subject, + datetime.strftime(stance_date, "%Y-%m-%d") if stance_date is not None else None, + stance_quote, + stance_link, + ]) + print(".", end="") + sys.stdout.flush() diff --git a/lqdn/scrap_votes.py b/lqdn/scrap_votes.py new file mode 100644 index 0000000..40e0ecd --- /dev/null +++ b/lqdn/scrap_votes.py @@ -0,0 +1,46 @@ +# encoding: utf-8 + +from datetime import datetime +from string import ascii_uppercase +from time import sleep +import sys + +from bs4 import BeautifulSoup +import csv +import requests +from slugify import slugify + +with open("lqdn_representatives.txt") as file_handler: + deputes = file_handler.read().splitlines() + +with open("lqdn_votes.csv", "w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) + writer.writerow(["name", "slug", "matter", "subject", "date", "extract", "source_url"]) + + for depute in deputes: + # Do not DDOS lqdn wiki ;o) + sleep(.2) + content = requests.get(f"https://wiki.laquadrature.net/{depute}").text + soup = BeautifulSoup(content, features="lxml") + deputy = soup.find("span", attrs={"class": "mw-headline"}) + if deputy is not None: + stance_author = deputy.text.split(",")[0].split(":")[1].strip() + else: + stance_author = depute + print(stance_author) + votes = soup.find("span", attrs={"id": "Votes"}).parent.find_next_sibling("ul") + if votes is not None: + for vote in votes.find_all("li"): + pass##print(f" {vote}") + writer.writerow([ + stance_author, + slugify(stance_author), + stance_matter, + stance_subject, + stance_date, + stance_quote, + stance_link, + ]) + print(".", end="") + sys.stdout.flush() + print() diff --git a/parltrack/json_reader.py b/parltrack/json_reader.py new file mode 100644 index 0000000..a97fa4e --- /dev/null +++ b/parltrack/json_reader.py @@ -0,0 +1,33 @@ +# encoding: utf-8 + +import json + +# Default block size +SIZE_BLOCK = 32768 + + +def json_reader(filepath, size_block=SIZE_BLOCK): + # Initiate reading + level = 0 + in_string = False + loop = True + element = "" + with open(filepath) as f: + content = f.read(size_block) + while loop: + for char in content: + if char == '"': + in_string != in_string + if not in_string and char == "{": + level += 1 + if level > 0: + element += char + if not in_string and char == "}": + level -= 1 + if level == 0: + yield json.loads(element) + element = "" + # Reading next block + content = f.read(size_block) + if content == "": + loop = False diff --git a/parltrack/parltrack.py b/parltrack/parltrack.py new file mode 100644 index 0000000..bf9c01f --- /dev/null +++ b/parltrack/parltrack.py @@ -0,0 +1,286 @@ +# encoding: utf-8 + +from datetime import datetime +from io import StringIO, BytesIO +import json +import os +import tempfile +import zipfile + +import click +import requests + +from app.model.address import AddressModel +from app.model.contact import ContactModel +from app.model.entity import EntityModel +from app.model.membership import MembershipModel +from app.model.representative import RepresentativeModel +from app.model.type import TypeModel +from command.json_reader import json_reader + +import json + + +def import_representatives(filepath): + click.echo("Importing representatives from parltrack") + click.echo(" Reading file") + with open(filepath) as f: + meps = json.load(f) + for representative in meps: + click.echo(".", nl=False) + + #for representative in json_reader(filepath): + # click.echo(".", nl=False) + + + + +def toto(): + # Delete only things related to "Assemblée Nationale" ! + MembershipModel.query.delete() #filter_by(source="Assemblée Nationale").delete() + RepresentativeModel.query.filter_by(source="Assemblée Nationale").delete() + AddressModel.query.filter_by(source="Assemblée Nationale").delete() + ContactModel.query.filter_by(source="Assemblée Nationale").delete() + EntityModel.query.filter_by(source="Assemblée Nationale").delete() + TypeModel.query.filter_by(source="Assemblée Nationale").delete() + + url = "https://data.assemblee-nationale.fr/static/openData/repository/15/amo/tous_acteurs_mandats_organes_xi_legislature/AMO30_tous_acteurs_tous_mandats_tous_organes_historique.json.zip" + if False: + datas = BytesIO() + result = requests.get(url, stream=True) + datas.write(result.content) + datas.seek(0) + with tempfile.TemporaryDirectory() as tmpdir: + with zipfile.ZipFile(datas, "r") as zip_ref: + zip_ref.extractall(tmpdir) + print(tmpdir) + for root, dirs, files in os.walk(tmpdir): + if root.endswith("acteur"): + for filename in files: + print(os.path.join(root, filename)) + for filename in files[:1]: + with open(os.path.join(root, filename)) as filehandler: + data = json.load(filehandler) + print(json.dumps(data, indent=2)) + # Testing + tmpdir = "C:\\Users\\tbouchet\\Downloads\\json" + click.echo(" ", nl=False) + for root, dirs, files in os.walk(tmpdir): + if root.endswith("organe"): + with click.progressbar(files, label="Entities") as progress_files: + entities = [] + for filename in progress_files: + #print(filename) + with open(os.path.join(root, filename)) as filehandler: + data = json.load(filehandler)["organe"] + + # Type + # file:///C:/Users/tbouchet/Downloads/html/Schemas_Entites/AMO/Schemas_Organes.html + type_types = { + "API": "Assemblée parlementaire internationale", + "ASSEMBLEE": "Assemblée nationale", + "ASSEXT": "Autres conseils", + "ASSTOM": "Assemblée territoriale d’Outre-Mer", + "CES": "Conseil économique, social et environnemental", + "CJR": "Cour de justice de la République", + "CMP": "Commissions mixtes paritaires", + "CNPE": "Commissions d’enquêtes", + "CNPS": "Commissions spéciales", + "COMMUNE": "Conseil Municipal", + "COMNL": "Autres commissions permanentes", + "COMPER": "Commissions permanentes législatives", + "COMSENAT": "Commissions du Sénat", + "COMSPSENAT": "Commissions spéciales du Sénat", + "CONFPT": "CONFPT", + "CONSTITU": "Conseil constitutionnel", + "DELEG": "Délégation parlementaire", + "DELEGBUREAU": "Délégation du Bureau (de l’AN)", + "DELEGSENAT": "Délégation du Sénat", + "DEPARTEMENT": "Conseil général ou départemental", + "EUROPE": "Mandat européen", + "GA": "Groupe d’amitié", + "GE": "Groupe d’études", + "GEVI": "Groupe d’études à vocation internationale", + "GOUVERNEMENT": "Gouvernement", + "GP": "Groupe politique", + "GROUPESENAT": "Groupe Sénat", + "HCJ": "Haute Cour de justice", + "INTCO": "Intercommunalité", + "MINISTERE": "Ministère", + "MISINFO": "Missions d’informations", + "MISINFOCOM": "Missions d’information communes", + "MISINFOPRE": "Missions d’information de la conférence des Présidents", + "OFFPAR": "Office parlementaire ou délégation mixte", + "ORGAINT": "Organisme international", + "ORGEXTPARL": "Organisme extra parlementaire", + "PARPOL": "Parti Politique", + "PRESREP": "Présidence de la République", + "REGION": "Conseil régional", + "SENAT": "Mandat de sénateur", + } + type = TypeModel.query.filter_by(name = type_types[data["codeType"]]).first() + if type is None: + type = TypeModel() + type.source = "Assemblée Nationale" + type.source_uid = data["codeType"] + type.name = type_types[data["codeType"]] + type.save() + + # Entity + entity = EntityModel( + source = "Assemblée Nationale", + source_uid = data["uid"], + type_id = type.id, + name = data["libelle"], + code = data["libelleAbrev"], + country_id = country.id, + ) + if data["organeParent"] is not None: + parent = EntityModel.query.filter_by(source_uid=data["organeParent"]).first() + if parent is not None: + entity.parent_id = parent.id + else: + print(data["uid"], data["organeParent"]) + entity.save() + + for root, dirs, files in os.walk(tmpdir): + if root.endswith("acteur"): + with click.progressbar(files, label="Representatives") as progress_files: + for filename in progress_files: + with open(os.path.join(root, filename)) as filehandler: + data = json.load(filehandler)["acteur"] + + # Representative + representative = RepresentativeModel() + representative.source = "Assemblée Nationale" + representative.source_uid = data["uid"]["#text"] + nom = data["etatCivil"]["ident"]["nom"] + prenom = data["etatCivil"]["ident"]["prenom"] + representative.name = f"{prenom} {nom}" + representative.nationality_id = country.id + representative.birth_date = datetime.strptime( + data["etatCivil"]["infoNaissance"]["dateNais"], "%Y-%m-%d" + ) + if isinstance(data["etatCivil"]["infoNaissance"]["villeNais"], str): + representative.birth_place = data["etatCivil"]["infoNaissance"][ + "villeNais" + ] + if isinstance(data["profession"]["libelleCourant"], str): + representative.profession = data["profession"]["libelleCourant"] + representative.save() + + # Address + if data["adresses"].get("adresse", "") != "": + address_types = { + "0": "Parliament address", + "1": "Address", + "2": "Constituency address", + } + + def manage_address(data_address): + if data_address["type"] in address_types: + address = AddressModel() + address.representative_id = representative.id + address.source = "Assemblée Nationale" + address.source_uid = data_address["uid"] + address.name = address_types[data_address["type"]] + address.country_id = country.id + address.number = data_address["numeroRue"] + address.street = data_address["nomRue"] + address.miscellaneous = data_address[ + "complementAdresse" + ] + address.city = data_address["ville"] + address.zipcode = data_address["codePostal"] + address.save() + + if isinstance(data["adresses"]["adresse"], list): + for data_address in data["adresses"]["adresse"]: + manage_address(data_address) + elif isinstance(data["adresses"]["adresse"], dict): + manage_address(data["adresses"]["adresse"]) + + # Contact + contact_types = { + "3": "Phone (Press contact)", + "11": "Phone", + "12": "Fax", + "15": "Email", + "22": "Website", + "23": "Senate URL", + "24": "Twitter", + "25": "Facebook", + } + + def manage_contact(data_contact): + if data_contact["type"] in contact_types: + contact = ContactModel() + contact.representative_id = representative.id + contact.source = "Assemblée Nationale" + contact.source_uid = data_contact["uid"] + if data_contact["adresseDeRattachement"] is not None: + address = AddressModel.query.filter_by( + source_uid=data_contact["adresseDeRattachement"] + ).first() + if address is not None: + contact.address_id = address.id + contact.name = contact_types[data_contact["type"]] + contact.value = data_contact["valElec"] + contact.save() + + if isinstance(data["adresses"]["adresse"], list): + for data_contact in data["adresses"]["adresse"]: + manage_contact(data_contact) + elif isinstance(data["adresses"]["adresse"], dict): + manage_contact(data["adresses"]["adresse"]) + + # Unknown addresses ? + if isinstance(data["adresses"]["adresse"], list): + for data_address in data["adresses"]["adresse"]: + if data_address["type"] not in dict( + address_types, **contact_types + ): + print( + f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}" + ) + elif isinstance(data["adresses"]["adresse"], dict): + data_address = data["adresses"]["adresse"] + if data_address["type"] not in dict( + address_types, **contact_types + ): + print( + f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}" + ) + + if data["mandats"].get("mandat", "") != "": + # Membership + membership_types = { + "Membre": "Member", + } + + def manage_membership(data_membership): + if data_membership["infosQualite"]["codeQualite"] in membership_types: + entity = EntityModel.query.filter_by(source_uid=data_membership["organes"]["organeRef"]).first() + if entity is None: + print("Organe inconnu", data_membership["organes"]["organeRef"]) + return + membership = MembershipModel() + membership.representative_id = representative.id + membership.role = membership_types[data_membership["infosQualite"]["codeQualite"]] + membership.country_id = country.id + if data_membership["dateDebut"] is not None: + membership.start = datetime.strptime( + data_membership["dateDebut"], "%Y-%m-%d" + ) + if data_membership["dateFin"] is not None: + membership.end = datetime.strptime( + data_membership["dateFin"], "%Y-%m-%d" + ) + membership.entity_id = entity.id + membership.save() + + if isinstance(data["mandats"]["mandat"], list): + for data_membership in data["mandats"]["mandat"]: + manage_membership(data_membership) + elif isinstance(data["mandats"]["mandat"], dict): + manage_membership(data["mandats"]["mandat"])