On améliore la gestion des imports pour éviter les imports parallèles

avec chromium (notamment difficultés avec Facebook)
This commit is contained in:
Jean-Marie Favreau 2024-09-07 17:09:25 +02:00
parent f38d4bee97
commit f9038a03f4
3 changed files with 120 additions and 57 deletions

View File

@ -4,6 +4,10 @@ import json
from celery import Celery, Task from celery import Celery, Task
from celery.schedules import crontab from celery.schedules import crontab
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
import time as time_
from contextlib import contextmanager
from .import_tasks.downloader import * from .import_tasks.downloader import *
from .import_tasks.extractor import * from .import_tasks.extractor import *
@ -18,6 +22,8 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"agenda_culturel.settings.{APP_
app = Celery("agenda_culturel") app = Celery("agenda_culturel")
from django.core.cache import cache
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
@ -30,6 +36,26 @@ app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps. # Load task modules from all registered Django apps.
app.autodiscover_tasks() app.autodiscover_tasks()
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_chromium_lock(oid):
lock_id = "chromium-lock"
timeout_at = time_.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time_.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
def close_import_task(taskid, success, error_message, importer): def close_import_task(taskid, success, error_message, importer):
from agenda_culturel.models import BatchImportation from agenda_culturel.models import BatchImportation
@ -84,33 +110,21 @@ class ChromiumTask(Task):
return self._chm return self._chm
@app.task(base=ChromiumTask, bind=True) def run_recurrent_import_internal(rimport, downloader, req_id):
def run_recurrent_import(self, pk):
from agenda_culturel.models import RecurrentImport, BatchImportation from agenda_culturel.models import RecurrentImport, BatchImportation
from .db_importer import DBImporterEvents from .db_importer import DBImporterEvents
logger.info("Run recurrent import: {}".format(self.request.id)) logger.info("Run recurrent import: {}".format(req_id))
# get the recurrent import
rimport = RecurrentImport.objects.get(pk=pk)
# create a batch importation # create a batch importation
importation = BatchImportation(recurrentImport=rimport, celery_id=self.request.id) importation = BatchImportation(recurrentImport=rimport, celery_id=req_id)
# save batch importation # save batch importation
importation.save() importation.save()
# create an importer # create an importer
importer = DBImporterEvents(self.request.id) importer = DBImporterEvents(req_id)
# prepare downloading and extracting processes
if rimport.downloader == RecurrentImport.DOWNLOADER.SIMPLE:
downloader = SimpleDownloader()
elif rimport.downloader == RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS:
downloader = self.chromiumDownloader
downloader.pause = False
else:
downloader = self.chromiumDownloader
downloader.pause = True
if rimport.processor == RecurrentImport.PROCESSOR.ICAL: if rimport.processor == RecurrentImport.PROCESSOR.ICAL:
extractor = ICALExtractor() extractor = ICALExtractor()
@ -164,10 +178,44 @@ def run_recurrent_import(self, pk):
success, error_message = importer.import_events(json_events) success, error_message = importer.import_events(json_events)
# finally, close task # finally, close task
close_import_task(self.request.id, success, error_message, importer) close_import_task(req_id, success, error_message, importer)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
close_import_task(self.request.id, False, e, importer) close_import_task(req_id, False, e, importer)
return
@app.task(base=ChromiumTask, bind=True)
def run_recurrent_import(self, pk):
from agenda_culturel.models import RecurrentImport
# get the recurrent import
rimport = RecurrentImport.objects.get(pk=pk)
# prepare downloading and extracting processes
if rimport.downloader == RecurrentImport.DOWNLOADER.SIMPLE:
downloader = SimpleDownloader()
elif rimport.downloader == RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS:
downloader = self.chromiumDownloader
downloader.pause = False
else:
downloader = self.chromiumDownloader
downloader.pause = True
# only one thread using Chromium can run at a time,
# to prevent from errors (including strange Facebook errors)
if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]:
with memcache_chromium_lock(self.app.oid) as acquired:
if acquired:
return run_recurrent_import_internal(rimport, downloader, self.request.id)
else:
return run_recurrent_import_internal(rimport, downloader, self.request.id)
# if chromium is locked, we wait 30 seconds before retrying
raise self.retry(countdown=30)
@app.task(bind=True) @app.task(bind=True)
@ -224,53 +272,61 @@ def import_events_from_url(self, url, cat):
from agenda_culturel.models import RecurrentImport, BatchImportation from agenda_culturel.models import RecurrentImport, BatchImportation
from agenda_culturel.models import Event, Category from agenda_culturel.models import Event, Category
with memcache_chromium_lock(self.app.oid) as acquired:
logger.info("URL import: {}".format(self.request.id)) if acquired:
# clean url logger.info("URL import: {}".format(self.request.id))
url = Extractor.clean_url(url)
# we check if the url is known
existing = Event.objects.filter(uuids__contains=[url])
# if it's unknown
if len(existing) == 0:
# create an importer # clean url
importer = DBImporterEvents(self.request.id) url = Extractor.clean_url(url)
# create a batch importation # we check if the url is known
importation = BatchImportation(url_source=url, celery_id=self.request.id) existing = Event.objects.filter(uuids__contains=[url])
# save batch importation # if it's unknown
importation.save() if len(existing) == 0:
try: # create an importer
## create loader importer = DBImporterEvents(self.request.id)
u2e = URL2Events(ChromiumHeadlessDownloader(), single_event=True)
# set default values
values = {}
if cat is not None:
values = {"category": cat}
# get event # create a batch importation
events = u2e.process( importation = BatchImportation(url_source=url, celery_id=self.request.id)
url, published=False, default_values=values # save batch importation
) importation.save()
if events: try:
# convert it to json ## create loader
json_events = json.dumps(events, default=str) u2e = URL2Events(ChromiumHeadlessDownloader(), single_event=True)
# set default values
values = {}
if cat is not None:
values = {"category": cat}
# import events (from json) # get event
success, error_message = importer.import_events(json_events) events = u2e.process(
url, published=False, default_values=values
)
# finally, close task if events:
close_import_task(self.request.id, success, error_message, importer) # convert it to json
else: json_events = json.dumps(events, default=str)
close_import_task(self.request.id, False, "Cannot find any event", importer)
except Exception as e: # import events (from json)
logger.error(e) success, error_message = importer.import_events(json_events)
close_import_task(self.request.id, False, e, importer)
# finally, close task
close_import_task(self.request.id, success, error_message, importer)
else:
close_import_task(self.request.id, False, "Cannot find any event", importer)
except Exception as e:
logger.error(e)
close_import_task(self.request.id, False, e, importer)
return
# if chromium is locked, we wait 30 seconds before retrying
raise self.retry(countdown=30)
@app.task(base=ChromiumTask, bind=True) @app.task(base=ChromiumTask, bind=True)

View File

@ -3,6 +3,7 @@
{% block title %}{% block og_title %}Importations récurrentes{% endblock %}{% endblock %} {% block title %}{% block og_title %}Importations récurrentes{% endblock %}{% endblock %}
{% load utils_extra %} {% load utils_extra %}
{% load rimports_extra %}
{% load cat_extra %} {% load cat_extra %}
{% block entete_header %} {% block entete_header %}
{% css_categories %} {% css_categories %}
@ -13,8 +14,10 @@
<article> <article>
<header> <header>
<div class="slide-buttons"> <div class="slide-buttons">
<a href="{% url 'run_all_rimports' %}" role="button">Importer tout {% picto_from_name "play-circle" %}</a> <a href="{% url 'run_all_rimports' %}" role="button">Exécuter tout {% picto_from_name "play-circle" %}</a>
<a href="{% url 'run_all_rimports_failed' %}" role="button">Relancer les imports échoués {% picto_from_name "play-circle" %}</a> {% if has_failed_rimports %}
<a href="{% url 'run_all_rimports_failed' %}" role="button">Relancer les imports échoués {% picto_from_name "play-circle" %}</a>
{% endif %}
<a href="{% url 'add_rimport'%}" role="button">Ajouter {% picto_from_name "plus-circle" %}</a> <a href="{% url 'add_rimport'%}" role="button">Ajouter {% picto_from_name "plus-circle" %}</a>
</div> </div>
<h1>Importations récurrentes</h1> <h1>Importations récurrentes</h1>

View File

@ -12,6 +12,10 @@ from .utils_extra import picto_from_name
register = template.Library() register = template.Library()
@register.simple_tag
def has_failed_rimports():
return BatchImportation.objects.filter(status=BatchImportation.STATUS.FAILED).count() != 0
@register.simple_tag @register.simple_tag
def show_badge_failed_rimports(placement="top"): def show_badge_failed_rimports(placement="top"):
newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by( newest = BatchImportation.objects.filter(recurrentImport=OuterRef("pk")).order_by(