This commit is contained in:
Mindiell 2022-07-08 13:51:54 +02:00
parent 806d51e7a5
commit 5300a08f39
16 changed files with 286 additions and 420 deletions

View File

@ -0,0 +1,27 @@
# Assemblée Nationale
This institution is the french lower house of the french parliament.
First of all, you need to load historic datas.
Those tools are used to scrap :
* Representatives basic informations
* AN code, Name, Nationality, Sex, Birth date and place, job
* Types of entities
* Code and name of different types of entities linked to the house
* Entities
* Type, Country, Name, Code, Picture, Start and End of each entity
* Roles of representatives
* Code and name
* Membership
* Link between Representatives and Entities : Start and End of their memberships
## Schemas
https://www.assemblee-nationale.fr/opendata/Schemas_Entites/AMO/Schemas_Organes.html
Keep in mind that all those datas can be used to do a first push into politikorama and
that they should be subject to human proof-reading.

View File

@ -10,14 +10,18 @@ import requests
url = "https://data.assemblee-nationale.fr/static/openData/repository/15/amo/tous_acteurs_mandats_organes_xi_legislature/AMO30_tous_acteurs_tous_mandats_tous_organes_historique.json.zip"
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
data_source = os.path.join(data_root, "assemblee_nationale.zip")
data_target = os.path.join(data_root, "json")
# Cleaning old data
try:
os.remove("../tmp/assemblee_nationale.zip")
os.remove(data_source)
except FileNotFoundError:
# No file to remove
pass
try:
shutil.rmtree("../tmp/json")
shutil.rmtree(data_target)
except FileNotFoundError:
# No folder to remove
pass
@ -26,10 +30,10 @@ except FileNotFoundError:
print("Downloading archive")
with requests.get(url, stream=True) as result:
result.raise_for_status()
with open("../tmp/assemblee_nationale.zip", "wb") as f:
with open(data_source, "wb") as f:
for chunk in result.iter_content(chunk_size=8192):
f.write(chunk)
print("Unpacking archive")
shutil.unpack_archive("../tmp/assemblee_nationale.zip", "../tmp/")
shutil.unpack_archive(data_source, data_root)
os.remove("../tmp/assemblee_nationale.zip")
os.remove(data_source)

View File

@ -1 +0,0 @@
https://www.assemblee-nationale.fr/opendata/Schemas_Entites/AMO/Schemas_Organes.html

View File

@ -7,16 +7,23 @@ import csv
from datetime import datetime
import json
import os
import sys
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
target_root = os.path.join(data_root, "assemblee_nationale")
# Extract representatives
print("Scraping entities")
with open("../tmp/assemblee_nationale_entities.csv", "w", encoding="utf-8", newline="") as csvfile:
data_source = os.path.join(data_root, "json/organe")
data_target = os.path.join(target_root, "assemblee_nationale_entities.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["type_code", "country_iso2", "name", "code", "picture", "start", "end"])
for filename in os.listdir("../tmp/json/organe"):
for filename in os.listdir(data_source):
print(".", end="")
sys.stdout.flush()
# Loading informations
with open(os.path.join("../tmp/json/organe", filename)) as file_handler:
with open(os.path.join(data_source, filename)) as file_handler:
organe = json.load(file_handler)["organe"]
type_raw = organe["codeType"]
name = organe["libelle"]

View File

@ -7,18 +7,25 @@ import csv
from datetime import datetime
import json
import os
import sys
from slugify import slugify
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
target_root = os.path.join(data_root, "assemblee_nationale")
# Extract representatives
print("Scraping memberships")
with open("../tmp/assemblee_nationale_memberships.csv", "w", encoding="utf-8", newline="") as csvfile:
data_source = os.path.join(data_root, "json/acteur")
data_target = os.path.join(target_root, "assemblee_nationale_memberships.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["representative_slug", "role_code", "entity_code", "start", "end"])
for filename in os.listdir("../tmp/json/acteur"):
for filename in os.listdir(data_source):
print(".", end="")
sys.stdout.flush()
# Loading informations
with open(os.path.join("../tmp/json/acteur", filename)) as file_handler:
with open(os.path.join(data_source, filename)) as file_handler:
acteur = json.load(file_handler)["acteur"]
identity = acteur["etatCivil"]["ident"]
representative_slug = slugify(f"{identity['prenom']} {identity['nom']}")
@ -26,6 +33,8 @@ with open("../tmp/assemblee_nationale_memberships.csv", "w", encoding="utf-8", n
if isinstance(mandats, dict):
mandats = [mandats]
for mandat in mandats:
print(".", end="")
sys.stdout.flush()
role_code = mandat["infosQualite"].get("codeQualite", "")
start = mandat.get("dateDebut", None)
end = mandat.get("dateFin", None)

View File

@ -8,16 +8,21 @@ import json
import os
import sys
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
target_root = os.path.join(data_root, "assemblee_nationale")
# Extract representatives
with open("../tmp/assemblee_nationale_representatives.csv", "w", encoding="utf-8", newline="") as csvfile:
data_source = os.path.join(data_root, "json/acteur")
data_target = os.path.join(target_root, "assemblee_nationale_representatives.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["code", "name", "picture", "nationality", "sex", "birth_date", "birth_place", "job"])
for filename in os.listdir("../tmp/json/acteur"):
for filename in os.listdir(data_source):
print(".", end="")
sys.stdout.flush()
# Loading informations
with open(os.path.join("../tmp/json/acteur", filename)) as file_handler:
with open(os.path.join(data_source, filename)) as file_handler:
acteur = json.load(file_handler)["acteur"]
uid = f"AN_{acteur['uid']['#text'][2:]}"
# Identity

View File

@ -8,18 +8,22 @@ import json
import os
import sys
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
target_root = os.path.join(data_root, "assemblee_nationale")
# Extract roles
print("Scraping roles")
with open("../tmp/assemblee_nationale_roles.csv", "w", encoding="utf-8", newline="") as csvfile:
data_source = os.path.join(data_root, "json/acteur")
data_target = os.path.join(target_root, "assemblee_nationale_roles.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["code", "name"])
roles = []
for filename in os.listdir("../tmp/json/acteur"):
for filename in os.listdir(data_source):
print(".", end="")
sys.stdout.flush()
# Loading informations
with open(os.path.join("../tmp/json/acteur", filename)) as file_handler:
with open(os.path.join(data_source, filename)) as file_handler:
acteur = json.load(file_handler)["acteur"]
mandats = acteur["mandats"]["mandat"]
if isinstance(mandats, dict):

View File

@ -43,18 +43,22 @@ TYPES = {
}
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
target_root = os.path.join(data_root, "assemblee_nationale")
# Extract types
print("Scraping types")
with open("../tmp/assemblee_nationale_types.csv", "w", encoding="utf-8", newline="") as csvfile:
data_source = os.path.join(data_root, "json/organe")
data_target = os.path.join(target_root, "assemblee_nationale_types.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["code", "name"])
types = {}
for filename in os.listdir("../tmp/json/organe"):
for filename in os.listdir(data_source):
print(".", end="")
sys.stdout.flush()
# Loading informations
with open(os.path.join("../tmp/json/organe", filename)) as file_handler:
with open(os.path.join(data_source, filename)) as file_handler:
organe = json.load(file_handler)["organe"]
if organe["codeType"].upper() not in types:
types[organe["codeType"].upper()] = TYPES.get(organe["codeType"].upper(), organe["codeType"].upper())

18
lqdn/README.md Normal file
View File

@ -0,0 +1,18 @@
# La Quadrature du Net
This organisation has collected a lot of important data about french representatives for
years in their wiki (mediawiki engine).
Those tools are used to scrap :
* Representatives basic informations
* Name, Nationality, Sex, Birth date and place, job
* Stances by Representatives
* Trying to find a matter and a subject for each stance, with a date and a source url
* Votes by Representatives
* Trying to find a matter and a subject for each vote, with a date and a source url
No result is specified
Keep in mind that all those datas can be used to do a first push into politikorama and
that they should be subject to human proof-reading.

View File

@ -1,13 +1,15 @@
# encoding: utf-8
import os
from string import ascii_uppercase
from time import sleep
import sys
from time import sleep
from bs4 import BeautifulSoup
import requests
url = "https://wiki.laquadrature.net/index.php?title=Cat%C3%A9gorie:D%C3%A9put%C3%A9s&pagefrom="
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
deputes = []
for letter in ascii_uppercase:
@ -25,6 +27,7 @@ for letter in ascii_uppercase:
sys.stdout.flush()
print()
with open("../tmp/liste_deputes.txt", "w", encoding="utf-8") as file_handler:
data_folder = os.path.join(data_root, "liste_deputes.txt")
with open(data_folder, "w", encoding="utf-8") as file_handler:
for depute in sorted(list(set(deputes))):
file_handler.write(f"{depute}\n")

View File

@ -3,30 +3,37 @@
import csv
from datetime import datetime
import locale
import os
import re
from string import ascii_uppercase
from time import sleep
import sys
from time import sleep
from bs4 import BeautifulSoup
import requests
from slugify import slugify
url = "https://wiki.laquadrature.net/index.php?title=Cat%C3%A9gorie:D%C3%A9put%C3%A9s&pagefrom="
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
locale.setlocale(locale.LC_ALL, "FR")
deputes = []
with open("../tmp/liste_deputes.txt", encoding="utf-8") as file_handler:
data_source = os.path.join(data_root, "liste_deputes.txt")
with open(data_source, encoding="utf-8") as file_handler:
deputes = file_handler.read().splitlines()
# Extract representatives
with open("../tmp/lqdn_representatives.csv", "w", encoding="utf-8", newline="") as csvfile:
data_target = os.path.join(data_root, "lqdn_representatives.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["name", "picture", "nationality", "sex", "birth_date", "birth_place", "job"])
for depute in deputes:
print(".", end="")
sys.stdout.flush()
try:
# Do not DDOS lqdn wiki ;o)
sleep(.2)
# Loading informations
content = requests.get(f"https://wiki.laquadrature.net/{depute}").text
soup = BeautifulSoup(content, features="lxml")
@ -59,3 +66,6 @@ with open("../tmp/lqdn_representatives.csv", "w", encoding="utf-8", newline="")
birth_city,
job_name,
])
except AttributeError:
print(f"\nError while scraping representative '{depute}'")
continue

View File

@ -2,22 +2,30 @@
from datetime import datetime
from string import ascii_uppercase
from time import sleep
import os
import sys
from time import sleep
from bs4 import BeautifulSoup
import csv
import requests
from slugify import slugify
with open("../tmp/liste_deputes.txt") as file_handler:
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
data_source = os.path.join(data_root, "liste_deputes.txt")
with open(data_source) as file_handler:
deputes = file_handler.read().splitlines()
with open("../tmp/lqdn_stances.csv", "w", encoding="utf-8", newline="") as csvfile:
data_target = os.path.join(data_root, "lqdn_stances.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["name", "slug", "matter", "subject", "date", "extract", "source_url"])
for depute in deputes:
print(".", end="")
sys.stdout.flush()
try:
# Do not DDOS lqdn wiki ;o)
sleep(.2)
content = requests.get(f"https://wiki.laquadrature.net/{depute}").text
@ -58,5 +66,6 @@ with open("../tmp/lqdn_stances.csv", "w", encoding="utf-8", newline="") as csvfi
stance_quote,
stance_link,
])
print(".", end="")
sys.stdout.flush()
except AttributeError:
print(f"\nError while scraping stances for representative '{depute}'")
continue

View File

@ -2,45 +2,55 @@
from datetime import datetime
from string import ascii_uppercase
from time import sleep
import os
import sys
from time import sleep
from bs4 import BeautifulSoup
import csv
import requests
from slugify import slugify
with open("lqdn_representatives.txt") as file_handler:
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
data_source = os.path.join(data_root, "liste_deputes.txt")
with open(data_source) as file_handler:
deputes = file_handler.read().splitlines()
with open("lqdn_votes.csv", "w", encoding="utf-8", newline="") as csvfile:
data_target = os.path.join(data_root, "lqdn_votes.csv")
with open(data_target, "w", encoding="utf-8", newline="") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["name", "slug", "matter", "subject", "date", "extract", "source_url"])
writer.writerow(["name", "slug", "matter", "date", "source_url"])
for depute in deputes:
print(".", end="")
sys.stdout.flush()
try:
# Do not DDOS lqdn wiki ;o)
sleep(.2)
content = requests.get(f"https://wiki.laquadrature.net/{depute}").text
soup = BeautifulSoup(content, features="lxml")
deputy = soup.find("span", attrs={"class": "mw-headline"})
if deputy is not None:
stance_author = deputy.text.split(",")[0].split(":")[1].strip()
vote_author = deputy.text.split(",")[0].split(":")[1].strip()
else:
stance_author = depute
print(stance_author)
vote_author = depute
votes = soup.find("span", attrs={"id": "Votes"}).parent.find_next_sibling("ul")
if votes is not None:
for vote in votes.find_all("li"):
pass##print(f" {vote}")
writer.writerow([
stance_author,
slugify(stance_author),
stance_matter,
stance_subject,
stance_date,
stance_quote,
stance_link,
])
print(".", end="")
sys.stdout.flush()
vote_date = datetime.strptime(vote.text.split()[0], "%d/%m/%Y")
vote_matter = vote.find("a").text
vote_url = vote.find("a").get("href")
writer.writerow([
vote_author,
slugify(vote_author),
vote_matter,
datetime.strftime(vote_date, "%Y-%m-%d"),
vote_url,
])
print()
except (AttributeError, ValueError):
print(f"\nError while scraping stances for representative '{depute}'")
continue

12
parltrack/README.md Normal file
View File

@ -0,0 +1,12 @@
# Parltrack
This organisation collect all data from European Parliament.
Those tools are used to scrap :
* Representatives basic informations
* EP code, Name, Nationality, Sex, Birth date and place, job
Keep in mind that all those datas can be used to do a first push into politikorama and
that they should be subject to human proof-reading.

View File

@ -1,286 +0,0 @@
# encoding: utf-8
from datetime import datetime
from io import StringIO, BytesIO
import json
import os
import tempfile
import zipfile
import click
import requests
from app.model.address import AddressModel
from app.model.contact import ContactModel
from app.model.entity import EntityModel
from app.model.membership import MembershipModel
from app.model.representative import RepresentativeModel
from app.model.type import TypeModel
from command.json_reader import json_reader
import json
def import_representatives(filepath):
click.echo("Importing representatives from parltrack")
click.echo(" Reading file")
with open(filepath) as f:
meps = json.load(f)
for representative in meps:
click.echo(".", nl=False)
#for representative in json_reader(filepath):
# click.echo(".", nl=False)
def toto():
# Delete only things related to "Assemblée Nationale" !
MembershipModel.query.delete() #filter_by(source="Assemblée Nationale").delete()
RepresentativeModel.query.filter_by(source="Assemblée Nationale").delete()
AddressModel.query.filter_by(source="Assemblée Nationale").delete()
ContactModel.query.filter_by(source="Assemblée Nationale").delete()
EntityModel.query.filter_by(source="Assemblée Nationale").delete()
TypeModel.query.filter_by(source="Assemblée Nationale").delete()
url = "https://data.assemblee-nationale.fr/static/openData/repository/15/amo/tous_acteurs_mandats_organes_xi_legislature/AMO30_tous_acteurs_tous_mandats_tous_organes_historique.json.zip"
if False:
datas = BytesIO()
result = requests.get(url, stream=True)
datas.write(result.content)
datas.seek(0)
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(datas, "r") as zip_ref:
zip_ref.extractall(tmpdir)
print(tmpdir)
for root, dirs, files in os.walk(tmpdir):
if root.endswith("acteur"):
for filename in files:
print(os.path.join(root, filename))
for filename in files[:1]:
with open(os.path.join(root, filename)) as filehandler:
data = json.load(filehandler)
print(json.dumps(data, indent=2))
# Testing
tmpdir = "C:\\Users\\tbouchet\\Downloads\\json"
click.echo(" ", nl=False)
for root, dirs, files in os.walk(tmpdir):
if root.endswith("organe"):
with click.progressbar(files, label="Entities") as progress_files:
entities = []
for filename in progress_files:
#print(filename)
with open(os.path.join(root, filename)) as filehandler:
data = json.load(filehandler)["organe"]
# Type
# file:///C:/Users/tbouchet/Downloads/html/Schemas_Entites/AMO/Schemas_Organes.html
type_types = {
"API": "Assemblée parlementaire internationale",
"ASSEMBLEE": "Assemblée nationale",
"ASSEXT": "Autres conseils",
"ASSTOM": "Assemblée territoriale dOutre-Mer",
"CES": "Conseil économique, social et environnemental",
"CJR": "Cour de justice de la République",
"CMP": "Commissions mixtes paritaires",
"CNPE": "Commissions denquêtes",
"CNPS": "Commissions spéciales",
"COMMUNE": "Conseil Municipal",
"COMNL": "Autres commissions permanentes",
"COMPER": "Commissions permanentes législatives",
"COMSENAT": "Commissions du Sénat",
"COMSPSENAT": "Commissions spéciales du Sénat",
"CONFPT": "CONFPT",
"CONSTITU": "Conseil constitutionnel",
"DELEG": "Délégation parlementaire",
"DELEGBUREAU": "Délégation du Bureau (de lAN)",
"DELEGSENAT": "Délégation du Sénat",
"DEPARTEMENT": "Conseil général ou départemental",
"EUROPE": "Mandat européen",
"GA": "Groupe damitié",
"GE": "Groupe détudes",
"GEVI": "Groupe détudes à vocation internationale",
"GOUVERNEMENT": "Gouvernement",
"GP": "Groupe politique",
"GROUPESENAT": "Groupe Sénat",
"HCJ": "Haute Cour de justice",
"INTCO": "Intercommunalité",
"MINISTERE": "Ministère",
"MISINFO": "Missions dinformations",
"MISINFOCOM": "Missions dinformation communes",
"MISINFOPRE": "Missions dinformation de la conférence des Présidents",
"OFFPAR": "Office parlementaire ou délégation mixte",
"ORGAINT": "Organisme international",
"ORGEXTPARL": "Organisme extra parlementaire",
"PARPOL": "Parti Politique",
"PRESREP": "Présidence de la République",
"REGION": "Conseil régional",
"SENAT": "Mandat de sénateur",
}
type = TypeModel.query.filter_by(name = type_types[data["codeType"]]).first()
if type is None:
type = TypeModel()
type.source = "Assemblée Nationale"
type.source_uid = data["codeType"]
type.name = type_types[data["codeType"]]
type.save()
# Entity
entity = EntityModel(
source = "Assemblée Nationale",
source_uid = data["uid"],
type_id = type.id,
name = data["libelle"],
code = data["libelleAbrev"],
country_id = country.id,
)
if data["organeParent"] is not None:
parent = EntityModel.query.filter_by(source_uid=data["organeParent"]).first()
if parent is not None:
entity.parent_id = parent.id
else:
print(data["uid"], data["organeParent"])
entity.save()
for root, dirs, files in os.walk(tmpdir):
if root.endswith("acteur"):
with click.progressbar(files, label="Representatives") as progress_files:
for filename in progress_files:
with open(os.path.join(root, filename)) as filehandler:
data = json.load(filehandler)["acteur"]
# Representative
representative = RepresentativeModel()
representative.source = "Assemblée Nationale"
representative.source_uid = data["uid"]["#text"]
nom = data["etatCivil"]["ident"]["nom"]
prenom = data["etatCivil"]["ident"]["prenom"]
representative.name = f"{prenom} {nom}"
representative.nationality_id = country.id
representative.birth_date = datetime.strptime(
data["etatCivil"]["infoNaissance"]["dateNais"], "%Y-%m-%d"
)
if isinstance(data["etatCivil"]["infoNaissance"]["villeNais"], str):
representative.birth_place = data["etatCivil"]["infoNaissance"][
"villeNais"
]
if isinstance(data["profession"]["libelleCourant"], str):
representative.profession = data["profession"]["libelleCourant"]
representative.save()
# Address
if data["adresses"].get("adresse", "") != "":
address_types = {
"0": "Parliament address",
"1": "Address",
"2": "Constituency address",
}
def manage_address(data_address):
if data_address["type"] in address_types:
address = AddressModel()
address.representative_id = representative.id
address.source = "Assemblée Nationale"
address.source_uid = data_address["uid"]
address.name = address_types[data_address["type"]]
address.country_id = country.id
address.number = data_address["numeroRue"]
address.street = data_address["nomRue"]
address.miscellaneous = data_address[
"complementAdresse"
]
address.city = data_address["ville"]
address.zipcode = data_address["codePostal"]
address.save()
if isinstance(data["adresses"]["adresse"], list):
for data_address in data["adresses"]["adresse"]:
manage_address(data_address)
elif isinstance(data["adresses"]["adresse"], dict):
manage_address(data["adresses"]["adresse"])
# Contact
contact_types = {
"3": "Phone (Press contact)",
"11": "Phone",
"12": "Fax",
"15": "Email",
"22": "Website",
"23": "Senate URL",
"24": "Twitter",
"25": "Facebook",
}
def manage_contact(data_contact):
if data_contact["type"] in contact_types:
contact = ContactModel()
contact.representative_id = representative.id
contact.source = "Assemblée Nationale"
contact.source_uid = data_contact["uid"]
if data_contact["adresseDeRattachement"] is not None:
address = AddressModel.query.filter_by(
source_uid=data_contact["adresseDeRattachement"]
).first()
if address is not None:
contact.address_id = address.id
contact.name = contact_types[data_contact["type"]]
contact.value = data_contact["valElec"]
contact.save()
if isinstance(data["adresses"]["adresse"], list):
for data_contact in data["adresses"]["adresse"]:
manage_contact(data_contact)
elif isinstance(data["adresses"]["adresse"], dict):
manage_contact(data["adresses"]["adresse"])
# Unknown addresses ?
if isinstance(data["adresses"]["adresse"], list):
for data_address in data["adresses"]["adresse"]:
if data_address["type"] not in dict(
address_types, **contact_types
):
print(
f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}"
)
elif isinstance(data["adresses"]["adresse"], dict):
data_address = data["adresses"]["adresse"]
if data_address["type"] not in dict(
address_types, **contact_types
):
print(
f" => Unkown address type : {data_address['type']} in file {filename} : {data_address['typeLibelle']}"
)
if data["mandats"].get("mandat", "") != "":
# Membership
membership_types = {
"Membre": "Member",
}
def manage_membership(data_membership):
if data_membership["infosQualite"]["codeQualite"] in membership_types:
entity = EntityModel.query.filter_by(source_uid=data_membership["organes"]["organeRef"]).first()
if entity is None:
print("Organe inconnu", data_membership["organes"]["organeRef"])
return
membership = MembershipModel()
membership.representative_id = representative.id
membership.role = membership_types[data_membership["infosQualite"]["codeQualite"]]
membership.country_id = country.id
if data_membership["dateDebut"] is not None:
membership.start = datetime.strptime(
data_membership["dateDebut"], "%Y-%m-%d"
)
if data_membership["dateFin"] is not None:
membership.end = datetime.strptime(
data_membership["dateFin"], "%Y-%m-%d"
)
membership.entity_id = entity.id
membership.save()
if isinstance(data["mandats"]["mandat"], list):
for data_membership in data["mandats"]["mandat"]:
manage_membership(data_membership)
elif isinstance(data["mandats"]["mandat"], dict):
manage_membership(data["mandats"]["mandat"])

View File

@ -0,0 +1,31 @@
# encoding: utf-8
import os
import shutil
import lzip
url = "https://parltrack.org/dumps/ep_meps.json.lz"
data_root = os.environ.get("POLITIKORAMA_DATA_ROOT", "../tmp")
data_source = os.path.join(data_root, "ep_meps.json.lz")
data_target = os.path.join(data_root, "json")
# Cleaning old data
try:
os.remove(data_source)
except FileNotFoundError:
# No file to remove
pass
try:
shutil.rmtree(data_target)
except FileNotFoundError:
# No folder to remove
pass
# Download and extract data
print("Downloading archive")
with open(data_source, "wb") as f:
for chunk in lzip.decompress_url_iter(url):
f.write(chunk)
os.remove(data_source)