diff --git a/src/agenda_culturel/celery.py b/src/agenda_culturel/celery.py index 5de0dee..791977e 100644 --- a/src/agenda_culturel/celery.py +++ b/src/agenda_culturel/celery.py @@ -1,7 +1,7 @@ import os import json -from celery import Celery, Task +from celery import Celery, Task, chain from celery.schedules import crontab from celery.utils.log import get_task_logger from celery.exceptions import MaxRetriesExceededError @@ -182,13 +182,19 @@ def run_recurrent_import_internal(rimport, downloader, req_id): logger.error(e) close_import_task(req_id, False, e, importer) - return @app.task(base=ChromiumTask, bind=True) -def run_recurrent_import(self, pk): +def run_recurrent_import(self, pklist): from agenda_culturel.models import RecurrentImport + if isinstance(pklist, list): + pk = pklist[0] + is_list = True + else: + is_list = False + pk = pklist + # get the recurrent import rimport = RecurrentImport.objects.get(pk=pk) @@ -207,21 +213,25 @@ def run_recurrent_import(self, pk): if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]: with memcache_chromium_lock(self.app.oid) as acquired: if acquired: - return run_recurrent_import_internal(rimport, downloader, self.request.id) + run_recurrent_import_internal(rimport, downloader, self.request.id) + return pklist[1:] if is_list else True else: - return run_recurrent_import_internal(rimport, downloader, self.request.id) + run_recurrent_import_internal(rimport, downloader, self.request.id) + return pklist[1:] if is_list else True try: # if chromium is locked, we wait before retrying raise self.retry(countdown=120) except MaxRetriesExceededError as e: logger.error(e) - close_import_task(req_id, False, e, importer) - - - + close_import_task(self.request.id, False, e, importer) + return pklist[1:] if is_list else False +def run_recurrent_imports_from_list(pklist): + + tasks = chain(run_recurrent_import.s(pklist) if i == 0 else run_recurrent_import.s() for i in range(len(pklist))) + tasks.delay() @app.task(bind=True) def daily_imports(self): @@ -230,10 +240,9 @@ def daily_imports(self): logger.info("Everyday imports") imports = RecurrentImport.objects.filter( recurrence=RecurrentImport.RECURRENCE.DAILY - ) + ).order_by("pk") - for imp in imports: - run_recurrent_import.delay(imp.pk) + run_recurrent_imports_from_list([imp.pk for imp in imports]) @app.task(bind=True) @@ -241,10 +250,9 @@ def run_all_recurrent_imports(self): from agenda_culturel.models import RecurrentImport logger.info("Run all imports") - imports = RecurrentImport.objects.all() + imports = RecurrentImport.objects.all().order_by("pk") - for imp in imports: - run_recurrent_import.delay(imp.pk) + run_recurrent_imports_from_list([imp.pk for imp in imports]) @app.task(bind=True) @@ -252,11 +260,9 @@ def run_all_recurrent_imports_failed(self): from agenda_culturel.models import RecurrentImport, BatchImportation logger.info("Run only failed imports") - imports = RecurrentImport.objects.all() + imports = RecurrentImport.objects.all().order_by("pk") - for imp in imports: - if imp.last_import().status == BatchImportation.STATUS.FAILED: - run_recurrent_import.delay(imp.pk) + run_recurrent_imports_from_list([imp.pk for imp in imports if imp.last_import().status == BatchImportation.STATUS.FAILED]) @app.task(bind=True) @@ -266,10 +272,9 @@ def weekly_imports(self): logger.info("Weekly imports") imports = RecurrentImport.objects.filter( recurrence=RecurrentImport.RECURRENCE.WEEKLY - ) + ).order_by("pk") - for imp in imports: - run_recurrent_import.delay(imp.pk) + run_recurrent_imports_from_list([imp.pk for imp in imports]) @app.task(base=ChromiumTask, bind=True) def import_events_from_url(self, url, cat): diff --git a/src/agenda_culturel/templates/agenda_culturel/rimports.html b/src/agenda_culturel/templates/agenda_culturel/rimports.html index e6c844d..a83b3a5 100644 --- a/src/agenda_culturel/templates/agenda_culturel/rimports.html +++ b/src/agenda_culturel/templates/agenda_culturel/rimports.html @@ -44,7 +44,9 @@