# encoding: utf-8 import csv from datetime import datetime from importlib import import_module, invalidate_caches from io import StringIO from flask import current_app, g, request, redirect, url_for from flask_admin import BaseView, expose from slugify import slugify from sqlalchemy import or_ from app.form.admin import ImportForm from app.model.address import AddressModel from app.model.contact import ContactModel from app.model.country import CountryModel from app.model.decision import DecisionModel from app.model.entity import EntityModel from app.model.matter import MatterModel from app.model.membership import MembershipModel from app.model.recommendation import RecommendationModel from app.model.representative import RepresentativeModel from app.model.role import RoleModel from app.model.stance import StanceModel from app.model.type import TypeModel def default_import(headers): g.form = ImportForm() g.errors = [] g.messages = [] reader = None if g.form.validate_on_submit(): reader = csv.reader( StringIO(request.files["filename"].read().decode("utf-8")) ) for index, row in enumerate(next(reader, [])): for header in headers: if row.lower() == header: headers[header] = index for header in headers: if headers[header] is None: g.errors.append(f"Column {header} not found.") for header in headers: if headers[header] == "optional": headers[header] = None return reader class ImportAddressView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "name": None, "country_code": None, "number": "optional", "street": "optional", "miscellaneous": "optional", "city": None, "zipcode": None, "building": "optional", "floor": "optional", "stair": "optional", "office": "optional", "latitude": "optional", "longitude": "optional", } reader = default_import(headers) g.what = { "title": "Import addresses", "description": "Importing addresses will add unknown ones and update known" " ones. If an address is not present in imported file it will not be" " deleted.", "endpoint": "addresses.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name' and be the unique name of the address.", "One column MUST be 'country_code'.", "One column MUST be 'city' and be the name of a city.", "One column MUST be 'zipcode' and be the zipcode of this city.", ], "examples": [ "name,country_code,city,zipcode,building,floor", "Assemblée Nationale,FR,Paris,75000,,", "Victor's Office,BE,Bruxelles,1000,A,2", "White House,US,Washington DC,20500-0003,,", ], } if len(g.errors) == 0 and reader is not None: for row in reader: address_country = CountryModel.query.filter_by( code=row[headers["country_code"]].upper() ).first() address = AddressModel.query.filter_by( slug==slugify(row[headers["name"]]), ).first() if address is None: address = AddressModel( name = row[headers["name"]], slug = slugify(row[headers["name"]]), country = address_country, number = row[headers["number"]], street = row[headers["street"]], miscellaneous = row[headers["miscellaneous"]], city = row[headers["city"]], zipcode = row[headers["zipcode"]], building = row[headers["building"]], floor = row[headers["floor"]], stair = row[headers["stair"]], office = row[headers["office"]], latitude = row[headers["latitude"]], longitude = row[headers["longitude"]], ) g.messages.append(f"{row[headers['name']]} added.") address.save() else: updated = False if address.country != country: address.country = address_country updated = True if address.number != row[headers["number"]]: address.number = row[headers["number"]] updated = True if address.street != row[headers["street"]]: address.street = row[headers["street"]] updated = True if address.miscellaneous != row[headers["miscellaneous"]]: address.miscellaneous = row[headers["miscellaneous"]] updated = True if address.city != row[headers["city"]]: address.city = row[headers["city"]] updated = True if address.zipcode != row[headers["zipcode"]]: address.zipcode = row[headers["zipcode"]] updated = True if address.building != row[headers["building"]]: address.building = row[headers["building"]] updated = True if address.floor != row[headers["floor"]]: address.floor = row[headers["floor"]] updated = True if address.stair != row[headers["stair"]]: address.stair = row[headers["stair"]] updated = True if address.office != row[headers["office"]]: address.office = row[headers["office"]] updated = True if address.latitude != row[headers["latitude"]]: address.latitude = row[headers["latitude"]] updated = True if address.longitude != row[headers["longitude"]]: address.longitude = row[headers["longitude"]] updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") address.save() return self.render("admin/import.html") class ImportContactView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "representative_slug": None, "address_slug": "optional", "name": None, "value": None, } reader = default_import(headers) g.what = { "title": "Import contacts", "description": "Importing contacts will add unknown ones and update known" " ones. If a contact is not present in imported file it will not be" " deleted.", "endpoint": "contacts.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name' and be the name of the contact.", "One column MUST be 'value' and be the value of this contact.", "One column MUST be 'representative_slug' and be the slug identifying one representative.", "One column COULD be 'address_slug' and be the slug identifying an address.", ], "examples": [ "representative_slug,name,value,address_slug", "victor-hugo,Twitter,@vic,", "barack-obama,Phone,123456789,white-house", "barack-obama,Email,barack@obama.us,", ], } if len(g.errors) == 0 and reader is not None: for row in reader: contact_representative = RepresentativeModel.query.filter_by( slug=row[headers["representative_slug"]], ).first() if headers["addres_slug"] is not None: contact_address = AddressModel.query.filter_by( slug==row[headers["address_slug"]], ).first() else: contact_address = None contact = ContactModel.query.filter_by( slug==slugify(row[headers["name"]]), representative_id=contact_representative.id, ).first() if contact is None: contact = ContactModel( representative = contact_representative, address = contact_address, name = row[headers["name"]], slug = slugify(row[headers["name"]]), value = row[headers["value"]], ) g.messages.append(f"{row[headers['name']]} added.") contact.save() else: updated = False if contact.address != contact_address: contact.address = contact_address updated = True if contact.value != row[headers["value"]]: contact.value = row[headers["value"]] updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") contact.save() return self.render("admin/import.html") class ImportCountryView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = {"name": None, "code": None} reader = default_import(headers) g.what = { "title": "Import countries", "description": "Importing countries will add unknown ones and update known" " ones. If a country is not present in imported file it will not be" " deleted.", "endpoint": "countries.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name'.", "One column MUST be 'code'.", ], "examples": [ "name,code", "France,FR", "United States of America,US", "Spain,SP", ], } if len(g.errors) == 0 and reader is not None: for row in reader: country = CountryModel.query.filter_by(code=row[headers["code"]]).first() if country is None: country = CountryModel( name = row[headers["name"]], slug = slugify(row[headers["name"]]), code = row[headers["code"]].upper(), ) g.messages.append(f"{row[headers['name']]} added.") country.save() else: if country.name != row[headers["name"]]: country.name = row[headers["name"]] country.slug = slugify(row[headers["name"]]) g.messages.append(f"{row[headers['name']]} updated.") country.save() return self.render("admin/import.html") class ImportDecisionView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "representative_slug": None, "recommendation_slug": None, "value": None, } reader = default_import(headers) g.what = { "title": "Import decisions", "description": "Importing decisions will add unknown ones and update known" " ones. If a decision is not present in imported file it will not be" " deleted.", "endpoint": "decisions.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'value' and be the value of the decision.", "One column MUST be 'representative_slug' and be the slug identifying one representative.", "One column MUST be 'recommendation_slug' and be the slug identifying one recommendation.", ], "examples": [ "representative_slug,recommendation_slug,value", "victor-hugo,no-food-for-myself,Yes", "", "", ], } if len(g.errors) == 0 and reader is not None: for row in reader: decision_representative = RepresentativeModel.query.filter_by( slug=row[headers["representative_slug"]], ).first() decision_recommendation = RecommendationModel.query.filter_by( slug=row[headers["recommendation_slug"]], ).first() decision = DecisionModel.query.filter_by( representative=decision_representative, recommendation=decision_recommendation, ).first() if decision is None: decision = DecisionModel( representative = decision_representative, recommendation = decision_recommendation, value = value, ) g.messages.append(f"Decision for {row[headers['recommendation_slug']]} by {row[headers['representative_slug']]} added.") decision.save() else: if decision.value != value: decision.value = value g.messages.append(f"Decision for {row[headers['recommendation_slug']]} by {row[headers['representative_slug']]} updated.") decision.save() return self.render("admin/import.html") class ImportEntityView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "type_code": None, "country_code": None, "name": None, "code": None, "picture": "optional", "start": None, "end": None, } reader = default_import(headers) g.what = { "title": "Import entities", "description": "Importing entities will add unknown ones and update known" " ones. If an entity is not present in imported file it will not be" " deleted.", "endpoint": "entities.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name' and be the name of the entity.", "One column MUST be 'code' and be the unique code of this entity.", "One column COULD be 'picture' and be a link to the logo of the entity.", "One column MUST be 'start' and be a 'YYYY-MM-DD' date.", "One column MUST be 'end' and be null or a 'YYYY-MM-DD' date.", "One column COULD be 'type_code' and be a known type code.", "One column COULD be 'country_code' and be a known country code.", ], "examples": [ "name,type_code,country_code,code,picture,start,end", "Gouvernement,GOV,US,WH,,2021-01-01,2021-04-01", "", "", ], } if len(g.errors) == 0 and reader is not None: for row in reader: entity_type = TypeModel.query.filter_by( code=row[headers["type_code"]] ).first() entity_country = CountryModel.query.filter_by( code=row[headers["country_code"]].upper() ).first() entity = EntityModel.query.filter_by( code=row[headers["code"]] ).first() if row[headers["start"]] != "": start_date = datetime.strptime(row[headers["start"]], "%Y-%m-%d") else: start_date = None if row[headers["end"]] != "": end_date = datetime.strptime(row[headers["end"]], "%Y-%m-%d") else: end_date = None if headers["picture"] is not None: picture = row[headers["picture"]] else: picture = None if entity is None: entity = EntityModel( type = entity_type, country = entity_country, name = row[headers["name"]], slug = slugify(row[headers["name"]]), code = row[headers["code"]], picture = picture, start = start_date, end = end_date, ) g.messages.append(f"{row[headers['name']]} added.") entity.save() else: updated = False if entity.type != entity_type: entity.type = entity_type updated = True if entity.country != entity_country: entity.country = entity_country updated = True if entity.name != row[headers["name"]]: entity.name = row[headers["name"]] entity.slug = slugify(row[headers["name"]]) updated = True if entity.picture != picture: entity.picture = picture updated = True if entity.start != start_date: entity.start = start_date updated = True if entity.end != end_date: entity.end = end_date updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") entity.save() return self.render("admin/import.html") class ImportMatterView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "name": None, "description": "optional", } reader = default_import(headers) g.what = { "title": "Import matters", "description": "Importing matters will add unknown ones and update known" " ones. If a matter is not present in imported file it will not be" " deleted.", "endpoint": "matters.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name' and be the unique name of the matter.", "One column COULD be 'descrpition' and be the description of the matter.", ], "examples": [ "matter,description", "How to ?,Well an how-to is a strange case bla bla bla.", "", "", ], } if len(g.errors) == 0 and reader is not None: for row in reader: matter = MatterModel.query.filter_by( slug=slugify(row[headers["name"]]), ).first() if headers["description"] is not None: description = row[headers["description"]] else: description = None if matter is None: matter = DecisionModel( name = row[headers["name"]], slug = slugify(row[headers["name"]]), description = description, ) g.messages.append(f"Matter {row[headers['name']]} added.") matter.save() else: if matter.description != value: matter.description = value g.messages.append(f"Matter {row[headers['name']]} updated.") matter.save() return self.render("admin/import.html") class ImportMembershipView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "representative_slug": None, "entity_code": None, "role_code": None, "start": None, "end": None, } reader = default_import(headers) g.what = { "title": "Import memberships", "description": "Importing memberships will add unknown ones and update" " known ones. If a membership is not present in imported file it" " will not be deleted.", "endpoint": "memberships.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'representative_slug'.", "One column MUST be 'entity_code'.", "One column MUST be 'role_code'.", "One column MUST be 'start'.", "One column MUST be 'end'.", ], "examples": [ "name,iso2,iso3", "France,FR,FRA", "United States of America,US,USA", "Spain,SP,SPA", ], } if len(g.errors) == 0 and reader is not None: for row in reader: representative = RepresentativeModel.query.filter_by( slug=slugify(row[headers["representative_slug"]]), ).first() entity = EntityModel.query.filter_by( code=row[headers["entity_code"]], ).first() role = RoleModel.query.filter_by( code=row[headers["role_code"]], ).first() if row[headers["start"]] != "": start_date = datetime.strptime(row[headers["start"]], "%Y-%m-%d") else: start_date = None if row[headers["end"]] != "": end_date = datetime.strptime(row[headers["end"]], "%Y-%m-%d") else: end_date = None membership = MembershipModel.query.filter_by( representative=representative, entity=entity, role=role, start=start_date, ).first() if membership is None: membership = MembershipModel( representative = representative, entity = entity, role = role, start = start_date, end = end_date, ) g.messages.append(f"Membership of {row[headers['representative_slug']]} added.") membership.save() else: if membership.end != end_date: membership.end = end_date g.messages.append(f"Membership of {row[headers['representative_slug']]} updated.") membership.save() return self.render("admin/import.html") class ImportRecommendationView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "matter_slug": None, "entity_code": None, "name": None, "code": None, "date": None, "description": "optional", "value": None, "weight": "optional", } reader = default_import(headers) g.what = { "title": "Import recommendations", "description": "Importing recommendations will add unknown ones and update" " known ones. If a recommendation is not present in imported file it" " will not be deleted.", "endpoint": "recommendations.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'matter_slug'.", "One column MUST be 'entity_code'.", "One column MUST be 'name'.", "One column MUST be 'code'.", "One column MUST be 'date'.", "One column COULD be 'description'.", "One column MUST be 'value'.", "One column COULD be 'weight'.", ], "examples": [ "name,iso2,iso3", "France,FR,FRA", "United States of America,US,USA", "Spain,SP,SPA", ], } if len(g.errors) == 0 and reader is not None: for row in reader: matter = MatterModel.query.filter_by( slug=row[headers["matter_slug"]], ).first() entity = EntityModel.query.filter_by( code=row[headers["entity_code"]], ).first() if row[headers["date"]] != "": recommendation_date = datetime.strptime(row[headers["date"]], "%Y-%m-%d") else: recommendation_date = None if row[headers["description"]] is not None: description = row[headers["description"]] else: description = None if row[headers["weight"]] != "": weight = int(row[headers["weight"]]) else: weight = 1 recommendation = RecommendationModel.query.filter_by( matter=matter, entity=entity, code=row[headers["code"]], ).first() if recommendation is None: recommendation = RecommendationModel( matter = matter, entity = entity, name = row[headers["name"]], slug = slugify(row[headers["name"]]), code = row[headers["code"]], date = recommendation_date, description = description, value = row[headers["value"]], weight = weight, ) g.messages.append(f"{row[headers['name']]} added.") recommendation.save() else: updated = False if recommendation.name != row[headers["name"]]: recommendation.name = row[headers["name"]] recommendation.slug = slugify(row[headers["name"]]) updated = True if recommendation.date != recommendation_date: recommendation.date = recommendation_date updated = True if recommendation.description != description: recommendation.description = description updated = True if recommendation.value != row[headers["value"]]: recommendation.value = row[headers["value"]] updated = True if recommendation.weight != weight: recommendation.weight = weight updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") recommendation.save() return self.render("admin/import.html") class ImportRepresentativeView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "code": None, "name": None, "picture": None, "nationality": None, "sex": None, "birth_date": None, "birth_place": None, "job": None, } reader = default_import(headers) g.what = { "title": "Import representatives", "description": "Importing representatives will add unknown ones and update" " known ones. If a representative is not present in imported file it" " will not be deleted.", "endpoint": "representatives.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'code' and be the unique identifier of the representative.", "One column MUST be 'name'.", "One column MUST be 'picture'.", "One column MUST be 'nationality' and be an ISO country code.", "One column MUST be 'sex' and be 'F' or 'M'.", "One column MUST be 'birth_date'.", "One column MUST be 'birth_place'.", "One column MUST be 'birth_job'.", ], "examples": [ "name,iso2,iso3", "France,FR,FRA", "United States of America,US,USA", "Spain,SP,SPA", ], } if len(g.errors) == 0 and reader is not None: # Values for row in reader: representative = RepresentativeModel.query.filter_by( code=row[headers["code"]], ).first() country = CountryModel.query.filter_by( code=row[headers["nationality"]].upper(), ).first() if row[headers["birth_date"]] != "": birth_date = datetime.strptime(row[headers["birth_date"]], "%Y-%m-%d").date() else: birth_date = None if representative is None: representative = RepresentativeModel( code = row[headers["code"]], name = row[headers["name"]], slug = slugify(row[headers["name"]]), picture = row[headers["picture"]], nationality = country, sex = row[headers["sex"]], birth_date = birth_date, birth_place = row[headers["birth_place"]], job = row[headers["job"]], ) g.messages.append(f"{row[headers['name']]} added.") representative.save() else: updated = False if representative.picture != row[headers["picture"]]: representative.picture = row[headers["picture"]] updated = True if representative.nationality != country: representative.nationality = country updated = True if representative.sex != row[headers["sex"]]: representative.sex = row[headers["sex"]] updated = True if representative.birth_date != birth_date: print(representative.birth_date) print(birth_date) representative.birth_date = birth_date updated = True if representative.birth_place != row[headers["birth_place"]]: representative.birth_place = row[headers["birth_place"]] updated = True if representative.job != row[headers["job"]]: representative.job = row[headers["job"]] updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") representative.save() return self.render("admin/import.html") class ImportRoleView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = {"name": None, "code": None} reader = default_import(headers) g.what = { "title": "Import roles", "description": "Importing roles will add unknown ones and update known" " ones. If a role is not present in imported file it will not be" " deleted.", "endpoint": "roles.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name'.", "One column MUST be 'code'.", ], "examples": [ "code,name", "GP,Groupe parlementaire", "Member,Membre", "CEO,Chief Executive Officer", ], } if len(g.errors) == 0 and reader is not None: # Values for row in reader: role = RoleModel.query.filter_by(code=row[headers["code"]]).first() if role is None: role = RoleModel( name = row[headers["name"]], slug = slugify(row[headers["name"]]), code = row[headers["code"]], ) g.messages.append(f"{row[headers['name']]} added.") role.save() else: if role.name != row[headers["name"]]: role.name = row[headers["name"]] role.slug = slugify(row[headers["name"]]) g.messages.append(f"{row[headers['name']]} updated.") role.save() return self.render("admin/import.html") class ImportStanceView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = { "name": None, "slug": None, "matter": None, "subject": None, "date": None, "extract": None, "source_url": None, } reader = default_import(headers) g.what = { "title": "Import stances", "description": "Importing stances will add unknown ones and update" " known ones. If a stance is not present in imported file it" " will not be deleted.", "endpoint": "stances.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name'.", "One column MUST be 'iso2' and be two characters long.", ], "examples": [ "name,iso2,iso3", "France,FR,FRA", "United States of America,US,USA", "Spain,SP,SPA", ], } if len(g.errors) == 0 and reader is not None: # Values for row in reader: # Representative representative = RepresentativeModel.query.filter_by( slug=row[headers["slug"]], ).first() if representative is None: representative = RepresentativeModel( name = row[headers["name"]], slug = row[headers["slug"]], ) representative.save() # Matter matter = MatterModel.query.filter_by( slug=slugify(row[headers["matter"]]), ).first() if matter is None: matter = MatterModel( name = row[headers["matter"]], slug = slugify(row[headers["matter"]]), ) matter.save() if row[headers["date"]] != "": stance_date = datetime.strptime(row[headers["date"]], "%Y-%m-%d") else: stance_date = None # Stance stance = StanceModel.query.filter_by( representative=representative, matter=matter, subject=row[headers["subject"]], date=stance_date, source_url=row[headers["source_url"]], ).first() if stance is None: stance = StanceModel( representative = representative, matter = matter, subject = row[headers["subject"]], date = stance_date, extract = row[headers["extract"]], source_url = row[headers["source_url"]], ) g.messages.append(f"Stance for {row[headers['name']]} added.") stance.save() else: updated = False if stance.subject != row[headers["subject"]]: stance.subject = row[headers["subject"]] updated = True if stance.date != stance_date: stance.date = stance_date updated = True if stance.extract != row[headers["extract"]]: stance.extract = row[headers["extract"]] updated = True if stance.source_url != row[headers["source_url"]]: stance.source_url = row[headers["source_url"]] updated = True if updated: g.messages.append(f"{row[headers['name']]} updated.") stance.save() return self.render("admin/import.html") class ImportTypeView(BaseView): @expose("/", methods=["GET", "POST"]) def index(self): headers = {"name": None, "code": None} reader = default_import(headers) g.what = { "title": "Import types", "description": "Importing types will add unknown ones and update known" " ones. If a type is not present in imported file it will not be" " deleted.", "endpoint": "types.index", "formats": [ "File format accepted is CSV (Comma Separated Values).", "First line chould be column headers.", "Other lines are values.", "One column MUST be 'name'.", "One column MUST be 'code'.", ], "examples": [ "code,name", "GP,Groupe parlementaire", "Member,Membre", "CEO,Chief Executive Officer", ], } if len(g.errors) == 0 and reader is not None: # Values for row in reader: type_ = TypeModel.query.filter_by(code=row[headers["code"]]).first() if type_ is None: type_ = TypeModel( name = row[headers["name"]], slug = slugify(row[headers["name"]]), code = row[headers["code"]], ) g.messages.append(f"{row[headers['name']]} added.") type_.save() else: if type_.name != row[headers["name"]]: type_.name = row[headers["name"]] type_.slug = slugify(row[headers["name"]]) g.messages.append(f"{row[headers['name']]} updated.") type_.save() return self.render("admin/import.html")