agenda_culturel/src/agenda_culturel/models.py

974 lines
40 KiB
Python

from django.db import models
from django_better_admin_arrayfield.models.fields import ArrayField
from django.utils.translation import gettext_lazy as _
from django.utils.safestring import mark_safe
from django.template.defaultfilters import slugify
from django.urls import reverse
from colorfield.fields import ColorField
from ckeditor.fields import RichTextField
from urllib.parse import urlparse
import urllib.request
import os
from django.core.files import File
from django.utils import timezone
from django.contrib.postgres.search import TrigramSimilarity
from django.db.models import Q
import recurrence.fields
import recurrence
import copy
import unicodedata
from django.template.defaultfilters import date as _date
from datetime import time, timedelta, date
from django.utils.timezone import datetime
from django.utils import timezone
from .calendar import CalendarList, CalendarDay
import logging
logger = logging.getLogger(__name__)
def remove_accents(input_str):
nfkd_form = unicodedata.normalize('NFKD', input_str)
return u"".join([c for c in nfkd_form if not unicodedata.combining(c)])
class StaticContent(models.Model):
name = models.CharField(verbose_name=_('Name'), help_text=_('Category name'), max_length=512, unique=True)
text = RichTextField(verbose_name=_('Content'), help_text=_('Text as shown to the visitors'))
url_path = models.CharField(verbose_name=_('URL path'), help_text=_('URL path where the content is included.'))
def __str__(self):
return self.name
def get_absolute_url(self):
return self.url_path
class Category(models.Model):
default_name = "Sans catégorie"
default_alt_name = "Événements non catégorisés"
default_codename = ""
default_css_class = "cat-nocat"
default_color = "#aaaaaa"
COLOR_PALETTE = [
("#ea5545", "color 1"),
("#f46a9b", "color 2"),
("#ef9b20", "color 3"),
("#edbf33", "color 4"),
("#ede15b", "color 5"),
("#bdcf32", "color 6"),
("#87bc45", "color 7"),
("#27aeef", "color 8"),
("#b33dc6", "color 9")]
name = models.CharField(verbose_name=_('Name'), help_text=_('Category name'), max_length=512)
alt_name = models.CharField(verbose_name=_('Alternative Name'), help_text=_('Alternative name used with a time period'), max_length=512)
codename = models.CharField(verbose_name=_('Short name'), help_text=_('Short name of the category'), max_length=3)
color = ColorField(verbose_name=_('Color'), help_text=_('Color used as background for the category'), blank=True, null=True)
def save(self, *args, **kwargs):
if self.color is None:
existing_colors = [c.color for c in Category.objects.all()]
if len(existing_colors) > len(Category.COLOR_PALETTE):
self.color = "#CCCCCC"
else:
for c, n in Category.COLOR_PALETTE:
if c not in existing_colors:
self.color = c
break
if self.color is None:
self.color = "#CCCCCC"
super(Category, self).save(*args, **kwargs)
def get_default_category():
try:
default, created = Category.objects.get_or_create(name=Category.default_name,
alt_name=Category.default_alt_name,
codename=Category.default_codename,
color=Category.default_color)
return default
except:
return None
def get_default_category_id():
cat = Category.get_default_category()
if cat:
return cat.id
else:
return None
def css_class(self):
return "cat-" + str(self.id)
def __str__(self):
return self.name
class Meta:
verbose_name = _('Category')
verbose_name_plural = _('Categories')
class DuplicatedEvents(models.Model):
def nb_duplicated(self):
return Event.objects.filter(possibly_duplicated=self).count()
def get_duplicated(self):
return Event.objects.filter(possibly_duplicated=self)
def merge_into(self, other):
# for all objects associated to this group
for e in Event.objects.filter(possibly_duplicated=self):
# change their group membership
e.possibly_duplicated = other
# save them
e.save()
# then delete the empty group
self.delete()
def merge_groups(groups):
if len(groups) == 0:
return None
elif len(groups) == 1:
return groups[0]
else:
result = groups[0]
for g in groups[1:]:
g.merge_into(result)
return result
def get_item_comparison(self, attr):
values = [getattr(e, attr) for e in self.get_duplicated()]
values = ["" if v is None else v for v in values]
if len(set([str(v) for v in values])) == 1:
return { "similar": True, "key": attr, "values": values[0] }
else:
return { "similar": False, "key": attr, "values": values }
def get_items_comparison(self):
return [self.get_item_comparison(e) for e in Event.data_fields(all=True)]
class Event(models.Model):
class STATUS(models.TextChoices):
PUBLISHED = "published", _("Published")
DRAFT = "draft", _("Draft")
TRASH = "trash", _("Trash")
created_date = models.DateTimeField(editable=False)
imported_date = models.DateTimeField(blank=True, null=True)
modified_date = models.DateTimeField(blank=True, null=True)
moderated_date = models.DateTimeField(blank=True, null=True)
recurrence_dtstart = models.DateTimeField(editable=False, blank=True, null=True)
recurrence_dtend = models.DateTimeField(editable=False, blank=True, null=True)
title = models.CharField(verbose_name=_('Title'), help_text=_('Short title'), max_length=512)
status = models.CharField(_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.DRAFT)
category = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category of the event'), null=True, default=Category.get_default_category_id(), on_delete=models.SET_DEFAULT)
start_day = models.DateField(verbose_name=_('Day of the event'), help_text=_('Day of the event'))
start_time = models.TimeField(verbose_name=_('Starting time'), help_text=_('Starting time'), blank=True, null=True)
end_day = models.DateField(verbose_name=_('End day of the event'), help_text=_('End day of the event, only required if different from the start day.'), blank=True, null=True)
end_time = models.TimeField(verbose_name=_('Final time'), help_text=_('Final time'), blank=True, null=True)
recurrences = recurrence.fields.RecurrenceField(verbose_name=_("Recurrence"), include_dtstart=False, blank=True, null=True)
location = models.CharField(verbose_name=_('Location'), help_text=_('Address of the event'), max_length=512, default="")
description = models.TextField(verbose_name=_('Description'), help_text=_('General description of the event'), blank=True, null=True)
local_image = models.ImageField(verbose_name=_('Illustration (local image)'), help_text=_("Illustration image stored in the agenda server"), max_length=1024, blank=True, null=True)
image = models.URLField(verbose_name=_('Illustration'), help_text=_("URL of the illustration image"), max_length=1024, blank=True, null=True)
image_alt = models.CharField(verbose_name=_('Illustration description'), help_text=_('Alternative text used by screen readers for the image'), blank=True, null=True, max_length=1024)
import_sources = ArrayField(models.CharField(max_length=512), verbose_name=_('Importation source'), help_text=_("Importation source used to detect removed entries."), blank=True, null=True)
uuids = ArrayField(models.CharField(max_length=512), verbose_name=_('UUIDs'), help_text=_("UUIDs from import to detect duplicated entries."), blank=True, null=True)
reference_urls = ArrayField(models.URLField(max_length=512), verbose_name=_('URLs'), help_text=_("List of all the urls where this event can be found."), blank=True, null=True)
tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Tags'), help_text=_("A list of tags that describe the event."), blank=True, null=True)
possibly_duplicated = models.ForeignKey(DuplicatedEvents, verbose_name=_('Possibly duplicated'), on_delete=models.SET_NULL, null=True, blank=True)
def get_consolidated_end_day(self, intuitive=True):
if intuitive:
end_day = self.get_consolidated_end_day(False)
if end_day != self.start_day and self.end_time and self.end_time < time(8):
return end_day + timedelta(days=-1)
else:
return end_day
else:
return self.end_day if self.end_day else self.start_day
def get_dates(self):
first = self.start_day
last = self.get_consolidated_end_day()
return [first + timedelta(n) for n in range(int((last - first).days) + 1)]
def get_nb_events_same_dates(self):
first = self.start_day
last = self.get_consolidated_end_day()
calendar = CalendarList(first, last, exact=True)
return [(len(d.events), d.date) for dstr, d in calendar.calendar_days.items()]
def is_single_day(self, intuitive=True):
return self.start_day == self.get_consolidated_end_day(intuitive)
def contains_date(self, d, intuitive=True):
return d >= self.start_day and d <= self.get_consolidated_end_day(intuitive)
def get_absolute_url(self):
return reverse("view_event", kwargs={"year": self.start_day.year,
"month": self.start_day.month,
"day": self.start_day.day,
"pk": self.pk, "extra": slugify(self.title)})
def __str__(self):
return _date(self.start_day) + ": " + self.title
class Meta:
verbose_name = _('Event')
verbose_name_plural = _('Events')
permissions = [("set_duplicated_event", "Can set an event as duplicated")]
def get_all_tags():
try:
tags = list(Event.objects.values_list('tags', flat = True))
except:
tags = []
uniq_tags = set()
for t in tags:
if t is not None:
uniq_tags = uniq_tags | set(t)
return list(uniq_tags)
def is_draft(self):
return self.status == Event.STATUS.DRAFT
def is_published(self):
return self.status == Event.STATUS.PUBLISHED
def is_trash(self):
return self.status == Event.STATUS.TRASH
def modified(self):
return self.modified_date is None or abs((self.modified_date - self.created_date).total_seconds()) > 1
def nb_draft_events():
return Event.objects.filter(status=Event.STATUS.DRAFT).count()
def download_image(self):
# first download file
a = urlparse(self.image)
basename = os.path.basename(a.path)
try:
tmpfile, _ = urllib.request.urlretrieve(self.image)
except:
return None
# if the download is ok, then create the corresponding file object
self.local_image = File(name=basename, file=open(tmpfile, "rb"))
def set_skip_duplicate_check(self):
self.skip_duplicate_check = True
def is_skip_duplicate_check(self):
return hasattr(self, "skip_duplicate_check")
def is_in_importation_process(self):
return hasattr(self, "in_importation_process")
def set_in_importation_process(self):
self.in_importation_process = True
def update_modification_dates(self):
now = timezone.now()
if not self.id:
self.created_date = now
if self.is_in_importation_process():
self.imported_date = now
if self.modified_date is None or not self.is_in_importation_process():
self.modified_date = now
def get_recurrence_at_date(self, year, month, day):
dtstart = timezone.make_aware(datetime(year, month, day, 0, 0), timezone.get_default_timezone())
recurrences = self.get_recurrences_between(dtstart, dtstart)
if len(recurrences) == 0:
return self
else:
return recurrences[0]
# return a copy of the current object for each recurrence between first an last date (included)
def get_recurrences_between(self, firstdate, lastdate):
if not self.has_recurrences():
return [self]
else:
result = []
dtstart = timezone.make_aware(datetime.combine(self.start_day, time()), timezone.get_default_timezone())
self.recurrences.dtstart = dtstart
for d in self.recurrences.between(firstdate, lastdate, inc=True, dtstart=dtstart):
c = copy.deepcopy(self)
c.start_day = d.date()
if c.end_day is not None:
shift = d.date() - self.start_day
c.end_day += shift
result.append(c)
return result
def has_recurrences(self):
# TODO: see https://forge.chapril.org/jmtrivial/agenda_culturel/issues/65
return self.recurrences is not None and len(self.recurrences.rrules) != 0
def update_recurrence_dtstartend(self):
sday = date.fromisoformat(self.start_day) if isinstance(self.start_day, str) else self.start_day
eday = date.fromisoformat(self.end_day) if isinstance(self.end_day, str) else self.end_day
stime = time.fromisoformat(self.start_time) if isinstance(self.start_time, str) else time() if self.start_time is None else self.start_time
etime = time.fromisoformat(self.end_time) if isinstance(self.end_time, str) else time() if self.end_time is None else self.end_time
self.recurrence_dtstart = datetime.combine(sday, stime)
if not self.has_recurrences():
if self.end_day is None:
self.dtend = None
else:
self.recurrence_dtend = datetime.combine(eday, etime)
else:
if self.recurrences.rrules[0].until is None and self.recurrences.rrules[0].count is None:
self.recurrence_dtend = None
else:
self.recurrences.dtstart = datetime.combine(sday, time())
occurrence = self.recurrences.occurrences()
try:
self.recurrence_dtend = occurrence[-1]
if self.recurrences.dtend is not None and self.recurrences.dtstart is not None:
self.recurrence_dtend += self.recurrences.dtend - self.recurrences.dtstart
except:
self.recurrence_dtend = self.recurrence_dtstart
def prepare_save(self):
self.update_modification_dates()
self.update_recurrence_dtstartend()
# if the image is defined but not locally downloaded
if self.image and not self.local_image:
self.download_image()
# try to detect category
if self.is_in_importation_process():
CategorisationRule.apply_rules(self)
def save(self, *args, **kwargs):
self.prepare_save()
# check for similar events if no duplicated is known only if the event is created
if self.pk is None and self.possibly_duplicated is None and not self.is_skip_duplicate_check():
# and if this is not an importation process
if not self.is_in_importation_process():
similar_events = self.find_similar_events()
# if it exists similar events, add this relation to the event
if len(similar_events) != 0:
self.set_possibly_duplicated(similar_events)
# delete duplicated group if it's only with one element
if self.possibly_duplicated is not None and self.possibly_duplicated.nb_duplicated() == 1:
self.possibly_duplicated.delete()
self.possibly_duplicated = None
super().save(*args, **kwargs)
def from_structure(event_structure, import_source = None):
if "category" in event_structure and event_structure["category"] is not None:
event_structure["category"] = Category.objects.get(name=event_structure["category"])
if "published" in event_structure and event_structure["published"] is not None:
if event_structure["published"]:
event_structure["status"] = Event.STATUS.PUBLISHED
else:
event_structure["status"] = Event.STATUS.DRAFT
del event_structure["published"]
else:
event_structure["status"] = Event.STATUS.DRAFT
if "url_human" in event_structure and event_structure["url_human"] is not None:
event_structure["reference_urls"] = [event_structure["url_human"]]
del event_structure["url_human"]
if "last_modified" in event_structure and event_structure["last_modified"] is not None:
d = datetime.fromisoformat(event_structure["last_modified"])
if d.year == 2024 and d.month > 2:
logger.warning("last modified {}".format(d))
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = timezone.make_aware(d, timezone.get_default_timezone())
event_structure["modified_date"] = d
del event_structure["last_modified"]
else:
event_structure["modified_date"] = None
if "start_time" in event_structure:
event_structure["start_time"] = time.fromisoformat(event_structure["start_time"])
if "end_time" in event_structure:
event_structure["end_time"] = time.fromisoformat(event_structure["end_time"])
if "location" not in event_structure or event_structure["location"] is None:
event_structure["location"] = ""
if "description" in event_structure and event_structure["description"] is None:
event_structure["description"] = ""
if "recurrences" in event_structure and event_structure["recurrences"] is not None:
event_structure["recurrences"] = recurrence.deserialize(event_structure["recurrences"])
event_structure["recurrences"].exdates = [e.replace(hour=0, minute=0, second=0) for e in event_structure["recurrences"].exdates]
event_structure["recurrences"].rdates = [e.replace(hour=0, minute=0, second=0) for e in event_structure["recurrences"].rdates]
else:
event_structure["recurrences"] = None
if import_source is not None:
event_structure["import_sources"] = [import_source]
return Event(**event_structure)
def find_similar_events(self):
start_time_test = Q(start_time=self.start_time)
if self.start_time is not None:
# convert str start_time to time
if isinstance(self.start_time, str):
self.start_time = time.fromisoformat(self.start_time)
interval = (time(self.start_time.hour - 1, self.start_time.minute) if self.start_time.hour >= 1 else time(0, 0),
time(self.start_time.hour + 1, self.start_time.minute) if self.start_time.hour < 23 else time(23, 59))
start_time_test = start_time_test | Q(start_time__range=interval)
return Event.objects.annotate(similarity_title=TrigramSimilarity("title", self.title)). \
annotate(similarity_location=TrigramSimilarity("location", self.location)). \
filter(Q(start_day=self.start_day) & start_time_test & Q(similarity_title__gt=0.5) & Q(similarity_title__gt=0.3))
def find_same_events_by_uuid(self):
return None if self.uuids is None or len(self.uuids) == 0 else Event.objects.filter(uuids__contains=self.uuids)
def split_uuid(uuid):
els = uuid.split(':')
if len(els) == 1:
return ":".join(els[0:-1]), 0
else:
if els[-1].isdigit():
return ":".join(els[0:-1]), int(els[-1])
else:
return ":".join(els), 0
def is_ancestor_uuid(uuid1, uuid2):
root1, version1 = Event.split_uuid(uuid1)
root2, version2 = Event.split_uuid(uuid2)
return root1 == root2 and version1 < version2
def is_ancestor_by_uuid(self, event):
if self.uuids is None or event.uuids is None:
return False
for s_uuid in self.uuids:
for e_uuid in event.uuids:
if Event.is_ancestor_uuid(s_uuid, e_uuid):
return True
return False
def get_possibly_duplicated(self):
if self.possibly_duplicated is None:
return []
else:
return Event.objects.filter(possibly_duplicated=self.possibly_duplicated).exclude(pk=self.pk)
def set_possibly_duplicated(self, events):
# get existing groups
groups = list(set([e.possibly_duplicated for e in events] + [self.possibly_duplicated]))
groups = [g for g in groups if g is not None]
# do we have to create a new group?
if len(groups) == 0:
group = DuplicatedEvents.objects.create()
else:
# otherwise merge existing groups
group = DuplicatedEvents.merge_groups(groups)
group.save()
# set the possibly duplicated group for the current object
self.possibly_duplicated = group
# and for the other events
for e in events:
e.possibly_duplicated = group
# finally update all events (including current if already created)
elist = list(events) + ([self] if self.pk is not None else [])
Event.objects.bulk_update(elist, fields=["possibly_duplicated"])
def data_fields(all=False):
if all:
result = ["category"]
else:
result = []
result += ["title", "location", "start_day", "start_time", "end_day", "end_time", "description", "image"]
if all:
result += ["local_image"]
result += ["image_alt", "reference_urls", "recurrences"]
if all:
result += ["tags"]
return result
def same_event_by_data(self, other):
for attr in Event.data_fields():
if str(getattr(self, attr)) != str(getattr(other, attr)):
return False
return True
def find_same_event_by_data_in_list(self, events):
return [e for e in events if self.same_event_by_data(e)]
def find_last_imported_not_modified(events):
events = [e for e in events if e.imported_date is not None and (e.modified_date is None or e.modified_date <= e.imported_date)]
if len(events) == 0:
return None
else:
events.sort(key=lambda e: e.imported_date, reverse=True)
return events[0]
def update(self, other):
# TODO: what about category, tags?
# set attributes
for attr in Event.data_fields():
setattr(self, attr, getattr(other, attr))
# adjust modified date if required
if other.modified_date and self.modified_date < other.modified_date:
self.modified_date = other.modified_date
# set status according to the input status
if other.status is not None:
self.status = other.status
# add a possible missing uuid
if self.uuids is None:
self.uuids = []
for uuid in other.uuids:
if not uuid in self.uuids:
self.uuids.append(uuid)
# Limitation: the given events should not be considered similar one to another...
def import_events(events, remove_missing_from_source=None):
to_import = []
to_update = []
min_date = timezone.now().date()
max_date = None
uuids = set()
# for each event, check if it's a new one, or a one to be updated
for event in events:
sdate = date.fromisoformat(event.start_day)
if event.end_day:
edate = date.fromisoformat(event.end_day)
else:
edate = sdate
if min_date is None or min_date > sdate:
min_date = sdate
if max_date is None or max_date < sdate:
max_date = sdate
if max_date is None or (event.end_day is not None and max_date < edate):
max_date = edate
if event.uuids and len(event.uuids) > 0:
uuids |= set(event.uuids)
# imported events should be updated
event.set_in_importation_process()
event.prepare_save()
# check if the event has already be imported (using uuid)
same_events = event.find_same_events_by_uuid()
if len(same_events) != 0:
# check if one event has been imported and not modified in this list
same_imported = Event.find_last_imported_not_modified(same_events)
if same_imported:
# if this event exists, it will be updated with new data only if the data is fresher
if same_imported.modified_date < event.modified_date:
same_imported.update(event)
same_imported.set_in_importation_process()
same_imported.prepare_save()
to_update.append(same_imported)
else:
# otherwise, the new event possibly a duplication of the others.
event.set_possibly_duplicated(same_events)
# it will be imported
to_import.append(event)
else:
# if uuid is unique (or not available), check for similar events
similar_events = event.find_similar_events()
# if it exists similar events, add this relation to the event
if len(similar_events) != 0:
# check if an event from the list is exactly the same as the new one (using data)
same_events = event.find_same_event_by_data_in_list(similar_events)
if same_events is not None and len(same_events) > 0:
# merge with the first one
same_events[0].update(event)
same_events[0].set_in_importation_process()
same_events[0].prepare_save()
to_update.append(same_events[0])
else:
# the event is possibly a duplication of the others
event.set_possibly_duplicated(similar_events)
to_import.append(event)
else:
# import this new event
to_import.append(event)
# then import all the new events
imported = Event.objects.bulk_create(to_import)
nb_updated = Event.objects.bulk_update(to_update, fields = Event.data_fields() + ["imported_date", "modified_date", "uuids", "status"])
nb_draft = 0
if remove_missing_from_source is not None and max_date is not None:
# events that are missing from the import but in database are turned into drafts
# only if they are in the future
in_interval = Event.objects.filter(((Q(end_day__isnull=True) & Q(start_day__gte=min_date) & Q(start_day__lte=max_date)) |
(Q(end_day__isnull=False) & ~(Q(start_day__gt=max_date) | Q(end_day__lt=min_date)))) & Q(import_sources__contains=[remove_missing_from_source]) & Q(status=Event.STATUS.PUBLISHED) & Q(uuids__len__gt=0))
to_draft = []
for e in in_interval:
if len(uuids.intersection(e.uuids)) == 0:
e.status = Event.STATUS.TRASH
e.prepare_save()
to_draft.append(e)
nb_draft = Event.objects.bulk_update(to_draft, fields = ["status"])
return imported, nb_updated, nb_draft
def set_current_date(self, date):
self.current_date = date
def get_start_end_datetimes(self, day):
if self.start_day == day:
if self.start_time is None:
dtstart = datetime.combine(self.start_day, time().min)
else:
dtstart = datetime.combine(self.start_day, self.start_time)
else:
dtstart = datetime.combine(day, time().min)
end_day = self.get_consolidated_end_day()
if end_day == day:
if self.end_time is None:
dtend = datetime.combine(end_day, time().max)
else:
dtend = datetime.combine(end_day, self.end_time)
else:
dtend = datetime.combine(day, time().max)
return dtstart, dtend
def get_concurrent_events(self):
day = self.current_date if hasattr(self, "current_date") else self.start_day
day_events = CalendarDay(self.start_day).get_events()
return [e for e in day_events if e != self and self.is_concurrent_event(e, day) and e.status == Event.STATUS.PUBLISHED]
def is_concurrent_event(self, e, day):
dtstart, dtend = self.get_start_end_datetimes(day)
e_dtstart, e_dtend = e.get_start_end_datetimes(day)
return (dtstart <= e_dtstart <= dtend) or (e_dtstart <= dtstart <= e_dtend)
class ContactMessage(models.Model):
class Meta:
verbose_name = _('Contact message')
verbose_name_plural = _('Contact messages')
subject = models.CharField(verbose_name=_('Subject'), help_text=_('The subject of your message'), max_length=512)
name = models.CharField(verbose_name=_('Name'), help_text=_('Your name'), max_length=512, blank=True, null=True)
email = models.EmailField(verbose_name=_('Email address'), help_text=_('Your email address'), max_length=254, blank=True, null=True)
message = RichTextField(verbose_name=_('Message'), help_text=_('Your message'))
date = models.DateTimeField(auto_now_add=True)
closed = models.BooleanField(verbose_name=_('Closed'), help_text=_('this message has been processed and no longer needs to be handled'), default=False)
comments = RichTextField(verbose_name=_('Comments'), help_text=_('Comments on the message from the moderation team'), default="", blank=True, null=True)
def nb_open_contactmessages():
return ContactMessage.objects.filter(closed=False).count()
class RecurrentImport(models.Model):
class Meta:
verbose_name = _('Recurrent import')
verbose_name_plural = _('Recurrent imports')
permissions = [("run_recurrentimport", "Can run a recurrent import")]
class PROCESSOR(models.TextChoices):
ICAL = "ical", _("ical")
ICALNOBUSY = "icalnobusy", _("ical no busy")
ICALNOVC = "icalnovc", _("ical no VC")
LACOOPE = "lacoope", _('lacoope.org')
LACOMEDIE = "lacomedie", _('la comédie')
LEFOTOMAT = "lefotomat", _('le fotomat')
LAPUCEALOREILLE = "lapucealoreille", _('la puce à l''oreille')
class DOWNLOADER(models.TextChoices):
SIMPLE = "simple", _("simple")
CHROMIUMHEADLESS = "chromium headless", _("Headless Chromium")
class RECURRENCE(models.TextChoices):
DAILY = "daily", _("daily"),
WEEKLY = "weekly", _("weekly")
name = models.CharField(verbose_name=_('Name'), help_text=_('Recurrent import name. Be careful to choose a name that is easy to understand, as it will be public and displayed on the site''s About page.'), max_length=512, default="")
processor = models.CharField(_("Processor"), max_length=20, choices=PROCESSOR.choices, default=PROCESSOR.ICAL)
downloader = models.CharField(_("Downloader"), max_length=20, choices=DOWNLOADER.choices, default=DOWNLOADER.SIMPLE)
recurrence = models.CharField(_("Import recurrence"), max_length=10, choices=RECURRENCE.choices, default=RECURRENCE.DAILY)
source = models.URLField(verbose_name=_('Source'), help_text=_("URL of the source document"), max_length=1024)
browsable_url = models.URLField(verbose_name=_('Browsable url'), help_text=_("URL of the corresponding document that will be shown to visitors."), max_length=1024, blank=True, null=True)
defaultPublished = models.BooleanField(verbose_name=_('Published'), help_text=_('Status of each imported event (published or draft)'), default=True)
defaultLocation = models.CharField(verbose_name=_('Location'), help_text=_('Address for each imported event'), max_length=512, null=True, blank=True)
defaultCategory = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category of each imported event'), default=Category.get_default_category_id(), on_delete=models.SET_DEFAULT)
defaultTags = ArrayField(models.CharField(max_length=64), verbose_name=_('Tags for each imported event'), help_text=_("A list of tags that describe each imported event."), blank=True, null=True)
def nb_imports(self):
return BatchImportation.objects.filter(recurrentImport=self).count()
def nb_events(self):
return Event.objects.filter(import_sources__contains=[self.source]).count()
def get_absolute_url(self):
return reverse("view_rimport", kwargs={"pk": self.pk})
def last_import(self):
events = BatchImportation.objects.filter(recurrentImport=self).order_by("-created_date")
return events[0]
class BatchImportation(models.Model):
class STATUS(models.TextChoices):
RUNNING = "running", _("Running")
CANCELED = "canceled", _("Canceled")
SUCCESS = "success", _("Success")
FAILED = "failed", _("Failed")
class Meta:
verbose_name = _('Batch importation')
verbose_name_plural = _('Batch importations')
permissions = [("run_batchimportation", "Can run a batch importation")]
created_date = models.DateTimeField(auto_now_add=True)
recurrentImport = models.ForeignKey(RecurrentImport, verbose_name=_('Recurrent import'), help_text=_('Reference to the recurrent import processing'), blank=True, null=True, on_delete=models.SET_NULL, editable=False)
status = models.CharField(_("Status"), max_length=20, choices=STATUS.choices, default=STATUS.RUNNING)
error_message = models.CharField(verbose_name=_('Error message'), max_length=512, blank=True, null=True)
nb_initial = models.PositiveIntegerField(verbose_name=_('Number of collected events'), default=0)
nb_imported = models.PositiveIntegerField(verbose_name=_('Number of imported events'), default=0)
nb_updated = models.PositiveIntegerField(verbose_name=_('Number of updated events'), default=0)
nb_removed = models.PositiveIntegerField(verbose_name=_('Number of removed events'), default=0)
celery_id = models.CharField(max_length=128, default="")
class CategorisationRule(models.Model):
weight = models.IntegerField(verbose_name=_('Weight'), help_text=_("The lower is the weight, the earlier the filter is applied"), default=0)
category = models.ForeignKey(Category, verbose_name=_('Category'), help_text=_('Category applied to the event'), on_delete=models.CASCADE)
title_contains = models.CharField(verbose_name=_('Contained in the title'), help_text=_('Text contained in the event title'), max_length=512, blank=True, null=True)
title_exact = models.BooleanField(verbose_name=_('Exact title extract'), help_text=_("If checked, the extract will be searched for in the title using the exact form (capitals, accents)."), default=False)
description_contains = models.CharField(verbose_name=_('Contained in the description'), help_text=_('Text contained in the description'), max_length=512, blank=True, null=True)
desc_exact = models.BooleanField(verbose_name=_('Exact description extract'), help_text=_("If checked, the extract will be searched for in the description using the exact form (capitals, accents)."), default=False)
location_contains = models.CharField(verbose_name=_('Contained in the location'), help_text=_('Text contained in the event location'), max_length=512, blank=True, null=True)
loc_exact = models.BooleanField(verbose_name=_('Exact location extract'), help_text=_("If checked, the extract will be searched for in the location using the exact form (capitals, accents)."), default=False)
class Meta:
verbose_name = _('Categorisation rule')
verbose_name_plural = _('Categorisation rules')
permissions = [("apply_categorisationrules", "Apply a categorisation rule")]
# all rules are applied, starting from the first to the last
def apply_rules(event):
rules = CategorisationRule.objects.all().order_by("weight", "pk")
for rule in rules:
if rule.match(event):
event.category = rule.category
return 1
return 0
def match_rules(event):
rules = CategorisationRule.objects.all().order_by("weight", "pk")
for rule in rules:
if rule.match(event):
return rule.category
return None
def match(self, event):
if self.description_contains and self.description_contains != "":
if self.desc_exact:
result = self.description_contains in event.description
else:
result = remove_accents(self.description_contains).lower() in remove_accents(event.description).lower()
if not result:
return False
if self.title_contains and self.title_contains != "":
if self.title_exact:
result = self.title_contains in event.title
else:
result = remove_accents(self.title_contains).lower() in remove_accents(event.title).lower()
if not result:
return False
if self.location_contains and self.location_contains != "":
if self.loc_exact:
result = self.location_contains in event.location
else:
result = remove_accents(self.location_contains).lower() in remove_accents(event.location).lower()
if not result:
return False
return True
class ModerationQuestion(models.Model):
question = models.CharField(verbose_name=_('Question'), help_text=_('Text that will be shown to moderators'), max_length=512, unique=True)
class Meta:
verbose_name = _('Moderation question')
verbose_name_plural = _('Moderation questions')
permissions = [("use_moderation_question", "Can use a moderation question to tag an event")]
def __str__(self):
char_limit = 30
return (self.question[:char_limit] + "...") if char_limit < len(self.question) else self.question
def get_absolute_url(self):
return reverse("view_mquestion", kwargs={"pk": self.pk})
def complete_id(self):
return "question_" + str(self.pk)
class ModerationAnswer(models.Model):
question = models.ForeignKey(ModerationQuestion, related_name="answers", verbose_name=_('Question'), help_text=_('Associated question from moderation'), on_delete=models.CASCADE)
answer = models.CharField(verbose_name=_('Answer'), help_text=_('Text that will be shown to moderators'), max_length=512)
adds_tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Adds tags'), help_text=_("A list of tags that will be added if you choose this answer."), blank=True, null=True)
removes_tags = ArrayField(models.CharField(max_length=64), verbose_name=_('Removes tags'), help_text=_("A list of tags that will be removed if you choose this answer."), blank=True, null=True)
def complete_id(self):
return "answer_" + str(self.question.pk) + '_' + str(self.pk)
def html_description(self):
result = self.answer + '<br /><span class="helptext">'
if self.adds_tags:
result += ' '.join(['<span role="button" class="small-cat">' + a + '</span>' for a in self.adds_tags])
if self.removes_tags:
result += ' '.join(['<span role="button" class="small-cat strike">' + a + '</span>' for a in self.removes_tags])
result += "</span>"
return mark_safe(result)
def valid_event(self, event):
if event.tags:
if self.adds_tags:
for t in self.adds_tags:
if t not in event.tags:
return False
if self.removes_tags:
for t in self.removes_tags:
if t in event.tags:
return False
return True
else:
return not self.adds_tags or len(self.adds_tags) == 0
def apply_answer(self, event):
if not self.adds_tags:
self.adds_tags = []
if not self.removes_tags:
self.removes_tags = []
logger.info('on applique la réponse ' + self.answer)
if event.tags:
event.tags = list((set(event.tags) | set(self.adds_tags)) - set(self.removes_tags))
else:
event.tags = self.adds_tags