814 lines
34 KiB
Python
814 lines
34 KiB
Python
# encoding: utf-8
|
|
|
|
import csv
|
|
from datetime import datetime
|
|
from importlib import import_module, invalidate_caches
|
|
from io import StringIO
|
|
|
|
from flask import current_app, g, request, redirect, url_for
|
|
from flask_admin import BaseView, expose
|
|
from slugify import slugify
|
|
from sqlalchemy import or_
|
|
|
|
from app.form.admin import ImportForm
|
|
from app import model
|
|
|
|
|
|
def default_import(headers):
|
|
g.form = ImportForm()
|
|
g.errors = []
|
|
g.messages = []
|
|
reader = None
|
|
if g.form.validate_on_submit():
|
|
reader = csv.reader(
|
|
StringIO(request.files["filename"].read().decode("utf-8"))
|
|
)
|
|
for index, row in enumerate(next(reader, [])):
|
|
for header in headers:
|
|
if row.lower() == header:
|
|
headers[header] = index
|
|
for header in headers:
|
|
if headers[header] is None:
|
|
g.errors.append(f"Column {header} not found.")
|
|
for header in headers:
|
|
if headers[header] == "optional":
|
|
headers[header] = None
|
|
|
|
return reader
|
|
|
|
|
|
class ImportAddressView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"name": None,
|
|
"country_code": None,
|
|
"number": "optional",
|
|
"street": "optional",
|
|
"miscellaneous": "optional",
|
|
"city": None,
|
|
"zipcode": None,
|
|
"building": "optional",
|
|
"floor": "optional",
|
|
"stair": "optional",
|
|
"office": "optional",
|
|
"latitude": "optional",
|
|
"longitude": "optional",
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import addresses",
|
|
"description": "Importing addresses will add unknown ones and update known"
|
|
" ones. If an address is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "addresses.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name' and be the unique name of the address.",
|
|
"One column MUST be 'country_code'.",
|
|
"One column MUST be 'city' and be the name of a city.",
|
|
"One column MUST be 'zipcode' and be the zipcode of this city.",
|
|
],
|
|
"examples": [
|
|
"name,country_code,city,zipcode,building,floor",
|
|
"Assemblée Nationale,FR,Paris,75000,,",
|
|
"Victor's Office,BE,Bruxelles,1000,A,2",
|
|
"White House,US,Washington DC,20500-0003,,",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
address_country = model.CountryModel.query.filter_by(
|
|
code=row[headers["country_code"]].upper()
|
|
).first()
|
|
address = model.AddressModel.query.filter_by(
|
|
slug==slugify(row[headers["name"]]),
|
|
).first()
|
|
if address is None:
|
|
address = model.AddressModel(
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
country = address_country,
|
|
number = row[headers["number"]],
|
|
street = row[headers["street"]],
|
|
miscellaneous = row[headers["miscellaneous"]],
|
|
city = row[headers["city"]],
|
|
zipcode = row[headers["zipcode"]],
|
|
building = row[headers["building"]],
|
|
floor = row[headers["floor"]],
|
|
stair = row[headers["stair"]],
|
|
office = row[headers["office"]],
|
|
latitude = row[headers["latitude"]],
|
|
longitude = row[headers["longitude"]],
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
address.save()
|
|
else:
|
|
updated = False
|
|
if address.country != country:
|
|
address.country = address_country
|
|
updated = True
|
|
if address.number != row[headers["number"]]:
|
|
address.number = row[headers["number"]]
|
|
updated = True
|
|
if address.street != row[headers["street"]]:
|
|
address.street = row[headers["street"]]
|
|
updated = True
|
|
if address.miscellaneous != row[headers["miscellaneous"]]:
|
|
address.miscellaneous = row[headers["miscellaneous"]]
|
|
updated = True
|
|
if address.city != row[headers["city"]]:
|
|
address.city = row[headers["city"]]
|
|
updated = True
|
|
if address.zipcode != row[headers["zipcode"]]:
|
|
address.zipcode = row[headers["zipcode"]]
|
|
updated = True
|
|
if address.building != row[headers["building"]]:
|
|
address.building = row[headers["building"]]
|
|
updated = True
|
|
if address.floor != row[headers["floor"]]:
|
|
address.floor = row[headers["floor"]]
|
|
updated = True
|
|
if address.stair != row[headers["stair"]]:
|
|
address.stair = row[headers["stair"]]
|
|
updated = True
|
|
if address.office != row[headers["office"]]:
|
|
address.office = row[headers["office"]]
|
|
updated = True
|
|
if address.latitude != row[headers["latitude"]]:
|
|
address.latitude = row[headers["latitude"]]
|
|
updated = True
|
|
if address.longitude != row[headers["longitude"]]:
|
|
address.longitude = row[headers["longitude"]]
|
|
updated = True
|
|
if updated:
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
address.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportContactView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"representative_slug": None,
|
|
"address_slug": "optional",
|
|
"name": None,
|
|
"value": None,
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import contacts",
|
|
"description": "Importing contacts will add unknown ones and update known"
|
|
" ones. If a contact is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "contacts.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name' and be the name of the contact.",
|
|
"One column MUST be 'value' and be the value of this contact.",
|
|
"One column MUST be 'representative_slug' and be the slug identifying one representative.",
|
|
"One column COULD be 'address_slug' and be the slug identifying an address.",
|
|
],
|
|
"examples": [
|
|
"representative_slug,name,value,address_slug",
|
|
"victor-hugo,Twitter,@vic,",
|
|
"barack-obama,Phone,123456789,white-house",
|
|
"barack-obama,Email,barack@obama.us,",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
contact_representative = model.RepresentativeModel.query.filter_by(
|
|
slug=row[headers["representative_slug"]],
|
|
).first()
|
|
if headers["addres_slug"] is not None:
|
|
contact_address = model.AddressModel.query.filter_by(
|
|
slug==row[headers["address_slug"]],
|
|
).first()
|
|
else:
|
|
contact_address = None
|
|
contact = model.ContactModel.query.filter_by(
|
|
slug==slugify(row[headers["name"]]),
|
|
representative_id=contact_representative.id,
|
|
).first()
|
|
if contact is None:
|
|
contact = model.ContactModel(
|
|
representative = contact_representative,
|
|
address = contact_address,
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
value = row[headers["value"]],
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
contact.save()
|
|
else:
|
|
updated = False
|
|
if contact.address != contact_address:
|
|
contact.address = contact_address
|
|
updated = True
|
|
if contact.value != row[headers["value"]]:
|
|
contact.value = row[headers["value"]]
|
|
updated = True
|
|
if updated:
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
contact.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportDecisionView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"representative_slug": None,
|
|
"recommendation_slug": None,
|
|
"value": None,
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import decisions",
|
|
"description": "Importing decisions will add unknown ones and update known"
|
|
" ones. If a decision is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "decisions.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'value' and be the value of the decision.",
|
|
"One column MUST be 'representative_slug' and be the slug identifying one representative.",
|
|
"One column MUST be 'recommendation_slug' and be the slug identifying one recommendation.",
|
|
],
|
|
"examples": [
|
|
"representative_slug,recommendation_slug,value",
|
|
"victor-hugo,no-food-for-myself,Yes",
|
|
"",
|
|
"",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
decision_representative = model.RepresentativeModel.query.filter_by(
|
|
slug=row[headers["representative_slug"]],
|
|
).first()
|
|
decision_recommendation = model.RecommendationModel.query.filter_by(
|
|
slug=row[headers["recommendation_slug"]],
|
|
).first()
|
|
decision = model.DecisionModel.query.filter_by(
|
|
representative=decision_representative,
|
|
recommendation=decision_recommendation,
|
|
).first()
|
|
if decision is None:
|
|
decision = model.DecisionModel(
|
|
representative = decision_representative,
|
|
recommendation = decision_recommendation,
|
|
value = value,
|
|
)
|
|
g.messages.append(f"Decision for {row[headers['recommendation_slug']]} by {row[headers['representative_slug']]} added.")
|
|
decision.save()
|
|
else:
|
|
if decision.value != value:
|
|
decision.value = value
|
|
g.messages.append(f"Decision for {row[headers['recommendation_slug']]} by {row[headers['representative_slug']]} updated.")
|
|
decision.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportEntityView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"type_code": None,
|
|
"country_code": None,
|
|
"name": None,
|
|
"code": None,
|
|
"picture": "optional",
|
|
"start": None,
|
|
"end": None,
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import entities",
|
|
"description": "Importing entities will add unknown ones and update known"
|
|
" ones. If an entity is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "entities.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name' and be the name of the entity.",
|
|
"One column MUST be 'code' and be the unique code of this entity.",
|
|
"One column COULD be 'picture' and be a link to the logo of the entity.",
|
|
"One column MUST be 'start' and be a 'YYYY-MM-DD' date.",
|
|
"One column MUST be 'end' and be null or a 'YYYY-MM-DD' date.",
|
|
"One column COULD be 'type_code' and be a known type code.",
|
|
"One column COULD be 'country_code' and be a known country code.",
|
|
],
|
|
"examples": [
|
|
"name,type_code,country_code,code,picture,start,end",
|
|
"Gouvernement,GOV,US,WH,,2021-01-01,2021-04-01",
|
|
"",
|
|
"",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
entity_type = model.TypeModel.query.filter_by(
|
|
code=row[headers["type_code"]]
|
|
).first()
|
|
entity_country = model.CountryModel.query.filter_by(
|
|
code=row[headers["country_code"]].upper()
|
|
).first()
|
|
entity = model.EntityModel.query.filter_by(
|
|
code=row[headers["code"]]
|
|
).first()
|
|
if row[headers["start"]] != "":
|
|
start_date = datetime.strptime(row[headers["start"]], "%Y-%m-%d")
|
|
else:
|
|
start_date = None
|
|
if row[headers["end"]] != "":
|
|
end_date = datetime.strptime(row[headers["end"]], "%Y-%m-%d")
|
|
else:
|
|
end_date = None
|
|
if headers["picture"] is not None:
|
|
picture = row[headers["picture"]]
|
|
else:
|
|
picture = None
|
|
if entity is None:
|
|
entity = model.EntityModel(
|
|
type = entity_type,
|
|
country = entity_country,
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
code = row[headers["code"]],
|
|
picture = picture,
|
|
start = start_date,
|
|
end = end_date,
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
entity.save()
|
|
else:
|
|
updated = False
|
|
if entity.type != entity_type:
|
|
entity.type = entity_type
|
|
updated = True
|
|
if entity.country != entity_country:
|
|
entity.country = entity_country
|
|
updated = True
|
|
if entity.name != row[headers["name"]]:
|
|
entity.name = row[headers["name"]]
|
|
entity.slug = slugify(row[headers["name"]])
|
|
updated = True
|
|
if entity.picture != picture:
|
|
entity.picture = picture
|
|
updated = True
|
|
if entity.start != start_date:
|
|
entity.start = start_date
|
|
updated = True
|
|
if entity.end != end_date:
|
|
entity.end = end_date
|
|
updated = True
|
|
if updated:
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
entity.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportMatterView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"name": None,
|
|
"description": "optional",
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import matters",
|
|
"description": "Importing matters will add unknown ones and update known"
|
|
" ones. If a matter is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "matters.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name' and be the unique name of the matter.",
|
|
"One column COULD be 'descrpition' and be the description of the matter.",
|
|
],
|
|
"examples": [
|
|
"matter,description",
|
|
"How to ?,Well an how-to is a strange case bla bla bla.",
|
|
"",
|
|
"",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
matter = model.MatterModel.query.filter_by(
|
|
slug=slugify(row[headers["name"]]),
|
|
).first()
|
|
if headers["description"] is not None:
|
|
description = row[headers["description"]]
|
|
else:
|
|
description = None
|
|
if matter is None:
|
|
matter = model.DecisionModel(
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
description = description,
|
|
)
|
|
g.messages.append(f"Matter {row[headers['name']]} added.")
|
|
matter.save()
|
|
else:
|
|
if matter.description != value:
|
|
matter.description = value
|
|
g.messages.append(f"Matter {row[headers['name']]} updated.")
|
|
matter.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportMembershipView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"representative_slug": None,
|
|
"entity_code": None,
|
|
"role_code": None,
|
|
"start": None,
|
|
"end": None,
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import memberships",
|
|
"description": "Importing memberships will add unknown ones and update"
|
|
" known ones. If a membership is not present in imported file it"
|
|
" will not be deleted.",
|
|
"endpoint": "memberships.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'representative_slug'.",
|
|
"One column MUST be 'entity_code'.",
|
|
"One column MUST be 'role_code'.",
|
|
"One column MUST be 'start'.",
|
|
"One column MUST be 'end'.",
|
|
],
|
|
"examples": [
|
|
"name,iso2,iso3",
|
|
"France,FR,FRA",
|
|
"United States of America,US,USA",
|
|
"Spain,SP,SPA",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
representative = model.RepresentativeModel.query.filter_by(
|
|
slug=slugify(row[headers["representative_slug"]]),
|
|
).first()
|
|
entity = model.EntityModel.query.filter_by(
|
|
code=row[headers["entity_code"]],
|
|
).first()
|
|
role = model.RoleModel.query.filter_by(
|
|
code=row[headers["role_code"]],
|
|
).first()
|
|
if row[headers["start"]] != "":
|
|
start_date = datetime.strptime(row[headers["start"]], "%Y-%m-%d")
|
|
else:
|
|
start_date = None
|
|
if row[headers["end"]] != "":
|
|
end_date = datetime.strptime(row[headers["end"]], "%Y-%m-%d")
|
|
else:
|
|
end_date = None
|
|
membership = model.MembershipModel.query.filter_by(
|
|
representative=representative,
|
|
entity=entity,
|
|
role=role,
|
|
start=start_date,
|
|
).first()
|
|
if membership is None:
|
|
membership = model.MembershipModel(
|
|
representative = representative,
|
|
entity = entity,
|
|
role = role,
|
|
start = start_date,
|
|
end = end_date,
|
|
)
|
|
g.messages.append(f"Membership of {row[headers['representative_slug']]} added.")
|
|
membership.save()
|
|
else:
|
|
if membership.end != end_date:
|
|
membership.end = end_date
|
|
g.messages.append(f"Membership of {row[headers['representative_slug']]} updated.")
|
|
membership.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportRecommendationView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"matter_slug": None,
|
|
"entity_code": None,
|
|
"name": None,
|
|
"code": None,
|
|
"date": None,
|
|
"description": "optional",
|
|
"value": None,
|
|
"weight": "optional",
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import recommendations",
|
|
"description": "Importing recommendations will add unknown ones and update"
|
|
" known ones. If a recommendation is not present in imported file it"
|
|
" will not be deleted.",
|
|
"endpoint": "recommendations.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'matter_slug'.",
|
|
"One column MUST be 'entity_code'.",
|
|
"One column MUST be 'name'.",
|
|
"One column MUST be 'code'.",
|
|
"One column MUST be 'date'.",
|
|
"One column COULD be 'description'.",
|
|
"One column MUST be 'value'.",
|
|
"One column COULD be 'weight'.",
|
|
],
|
|
"examples": [
|
|
"name,iso2,iso3",
|
|
"France,FR,FRA",
|
|
"United States of America,US,USA",
|
|
"Spain,SP,SPA",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
for row in reader:
|
|
matter = model.MatterModel.query.filter_by(
|
|
slug=row[headers["matter_slug"]],
|
|
).first()
|
|
entity = model.EntityModel.query.filter_by(
|
|
code=row[headers["entity_code"]],
|
|
).first()
|
|
if row[headers["date"]] != "":
|
|
recommendation_date = datetime.strptime(row[headers["date"]], "%Y-%m-%d")
|
|
else:
|
|
recommendation_date = None
|
|
if row[headers["description"]] is not None:
|
|
description = row[headers["description"]]
|
|
else:
|
|
description = None
|
|
if row[headers["weight"]] != "":
|
|
weight = int(row[headers["weight"]])
|
|
else:
|
|
weight = 1
|
|
recommendation = model.RecommendationModel.query.filter_by(
|
|
matter=matter,
|
|
entity=entity,
|
|
code=row[headers["code"]],
|
|
).first()
|
|
if recommendation is None:
|
|
recommendation = model.RecommendationModel(
|
|
matter = matter,
|
|
entity = entity,
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
code = row[headers["code"]],
|
|
date = recommendation_date,
|
|
description = description,
|
|
value = row[headers["value"]],
|
|
weight = weight,
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
recommendation.save()
|
|
else:
|
|
updated = False
|
|
if recommendation.name != row[headers["name"]]:
|
|
recommendation.name = row[headers["name"]]
|
|
recommendation.slug = slugify(row[headers["name"]])
|
|
updated = True
|
|
if recommendation.date != recommendation_date:
|
|
recommendation.date = recommendation_date
|
|
updated = True
|
|
if recommendation.description != description:
|
|
recommendation.description = description
|
|
updated = True
|
|
if recommendation.value != row[headers["value"]]:
|
|
recommendation.value = row[headers["value"]]
|
|
updated = True
|
|
if recommendation.weight != weight:
|
|
recommendation.weight = weight
|
|
updated = True
|
|
if updated:
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
recommendation.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportRoleView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {"name": None, "code": None}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import roles",
|
|
"description": "Importing roles will add unknown ones and update known"
|
|
" ones. If a role is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "roles.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name'.",
|
|
"One column MUST be 'code'.",
|
|
],
|
|
"examples": [
|
|
"code,name",
|
|
"GP,Groupe parlementaire",
|
|
"Member,Membre",
|
|
"CEO,Chief Executive Officer",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
# Values
|
|
for row in reader:
|
|
role = model.RoleModel.query.filter_by(code=row[headers["code"]]).first()
|
|
if role is None:
|
|
role = model.RoleModel(
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
code = row[headers["code"]],
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
role.save()
|
|
else:
|
|
if role.name != row[headers["name"]]:
|
|
role.name = row[headers["name"]]
|
|
role.slug = slugify(row[headers["name"]])
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
role.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportStanceView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {
|
|
"name": None,
|
|
"slug": None,
|
|
"matter": None,
|
|
"subject": None,
|
|
"date": None,
|
|
"extract": None,
|
|
"source_url": None,
|
|
}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import stances",
|
|
"description": "Importing stances will add unknown ones and update"
|
|
" known ones. If a stance is not present in imported file it"
|
|
" will not be deleted.",
|
|
"endpoint": "stances.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name'.",
|
|
"One column MUST be 'iso2' and be two characters long.",
|
|
],
|
|
"examples": [
|
|
"name,iso2,iso3",
|
|
"France,FR,FRA",
|
|
"United States of America,US,USA",
|
|
"Spain,SP,SPA",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
# Values
|
|
for row in reader:
|
|
# Representative
|
|
representative = model.RepresentativeModel.query.filter_by(
|
|
slug=row[headers["slug"]],
|
|
).first()
|
|
if representative is None:
|
|
representative = model.RepresentativeModel(
|
|
name = row[headers["name"]],
|
|
slug = row[headers["slug"]],
|
|
)
|
|
representative.save()
|
|
# Matter
|
|
matter = model.MatterModel.query.filter_by(
|
|
slug=slugify(row[headers["matter"]]),
|
|
).first()
|
|
if matter is None:
|
|
matter = model.MatterModel(
|
|
name = row[headers["matter"]],
|
|
slug = slugify(row[headers["matter"]]),
|
|
)
|
|
matter.save()
|
|
if row[headers["date"]] != "":
|
|
stance_date = datetime.strptime(row[headers["date"]], "%Y-%m-%d")
|
|
else:
|
|
stance_date = None
|
|
# Stance
|
|
stance = model.StanceModel.query.filter_by(
|
|
representative=representative,
|
|
matter=matter,
|
|
subject=row[headers["subject"]],
|
|
date=stance_date,
|
|
source_url=row[headers["source_url"]],
|
|
).first()
|
|
if stance is None:
|
|
stance = model.StanceModel(
|
|
representative = representative,
|
|
matter = matter,
|
|
subject = row[headers["subject"]],
|
|
date = stance_date,
|
|
extract = row[headers["extract"]],
|
|
source_url = row[headers["source_url"]],
|
|
)
|
|
g.messages.append(f"Stance for {row[headers['name']]} added.")
|
|
stance.save()
|
|
else:
|
|
updated = False
|
|
if stance.subject != row[headers["subject"]]:
|
|
stance.subject = row[headers["subject"]]
|
|
updated = True
|
|
if stance.date != stance_date:
|
|
stance.date = stance_date
|
|
updated = True
|
|
if stance.extract != row[headers["extract"]]:
|
|
stance.extract = row[headers["extract"]]
|
|
updated = True
|
|
if stance.source_url != row[headers["source_url"]]:
|
|
stance.source_url = row[headers["source_url"]]
|
|
updated = True
|
|
if updated:
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
stance.save()
|
|
|
|
return self.render("admin/import.html")
|
|
|
|
|
|
class ImportTypeView(BaseView):
|
|
@expose("/", methods=["GET", "POST"])
|
|
def index(self):
|
|
headers = {"name": None, "code": None}
|
|
reader = default_import(headers)
|
|
g.what = {
|
|
"title": "Import types",
|
|
"description": "Importing types will add unknown ones and update known"
|
|
" ones. If a type is not present in imported file it will not be"
|
|
" deleted.",
|
|
"endpoint": "types.index",
|
|
"formats": [
|
|
"File format accepted is CSV (Comma Separated Values).",
|
|
"First line chould be column headers.",
|
|
"Other lines are values.",
|
|
"One column MUST be 'name'.",
|
|
"One column MUST be 'code'.",
|
|
],
|
|
"examples": [
|
|
"code,name",
|
|
"GP,Groupe parlementaire",
|
|
"Member,Membre",
|
|
"CEO,Chief Executive Officer",
|
|
],
|
|
}
|
|
if len(g.errors) == 0 and reader is not None:
|
|
# Values
|
|
for row in reader:
|
|
type_ = model.TypeModel.query.filter_by(code=row[headers["code"]]).first()
|
|
if type_ is None:
|
|
type_ = model.TypeModel(
|
|
name = row[headers["name"]],
|
|
slug = slugify(row[headers["name"]]),
|
|
code = row[headers["code"]],
|
|
)
|
|
g.messages.append(f"{row[headers['name']]} added.")
|
|
type_.save()
|
|
else:
|
|
if type_.name != row[headers["name"]]:
|
|
type_.name = row[headers["name"]]
|
|
type_.slug = slugify(row[headers["name"]])
|
|
g.messages.append(f"{row[headers['name']]} updated.")
|
|
type_.save()
|
|
|
|
return self.render("admin/import.html")
|