Pour assurer la stabilité des imports nocturnes, on impose qu'ils se passent les

uns après les autres.

Cf #139
This commit is contained in:
Jean-Marie Favreau 2024-09-12 23:54:37 +02:00
parent 2637132a28
commit c0fc2c97f5
2 changed files with 30 additions and 23 deletions

View File

@ -1,7 +1,7 @@
import os import os
import json import json
from celery import Celery, Task from celery import Celery, Task, chain
from celery.schedules import crontab from celery.schedules import crontab
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.exceptions import MaxRetriesExceededError from celery.exceptions import MaxRetriesExceededError
@ -182,13 +182,19 @@ def run_recurrent_import_internal(rimport, downloader, req_id):
logger.error(e) logger.error(e)
close_import_task(req_id, False, e, importer) close_import_task(req_id, False, e, importer)
return
@app.task(base=ChromiumTask, bind=True) @app.task(base=ChromiumTask, bind=True)
def run_recurrent_import(self, pk): def run_recurrent_import(self, pklist):
from agenda_culturel.models import RecurrentImport from agenda_culturel.models import RecurrentImport
if isinstance(pklist, list):
pk = pklist[0]
is_list = True
else:
is_list = False
pk = pklist
# get the recurrent import # get the recurrent import
rimport = RecurrentImport.objects.get(pk=pk) rimport = RecurrentImport.objects.get(pk=pk)
@ -207,21 +213,25 @@ def run_recurrent_import(self, pk):
if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]: if rimport.downloader in [RecurrentImport.DOWNLOADER.CHROMIUMHEADLESS, RecurrentImport.DOWNLOADER.CHROMIUMHEADLESSPAUSE]:
with memcache_chromium_lock(self.app.oid) as acquired: with memcache_chromium_lock(self.app.oid) as acquired:
if acquired: if acquired:
return run_recurrent_import_internal(rimport, downloader, self.request.id) run_recurrent_import_internal(rimport, downloader, self.request.id)
return pklist[1:] if is_list else True
else: else:
return run_recurrent_import_internal(rimport, downloader, self.request.id) run_recurrent_import_internal(rimport, downloader, self.request.id)
return pklist[1:] if is_list else True
try: try:
# if chromium is locked, we wait before retrying # if chromium is locked, we wait before retrying
raise self.retry(countdown=120) raise self.retry(countdown=120)
except MaxRetriesExceededError as e: except MaxRetriesExceededError as e:
logger.error(e) logger.error(e)
close_import_task(req_id, False, e, importer) close_import_task(self.request.id, False, e, importer)
return pklist[1:] if is_list else False
def run_recurrent_imports_from_list(pklist):
tasks = chain(run_recurrent_import.s(pklist) if i == 0 else run_recurrent_import.s() for i in range(len(pklist)))
tasks.delay()
@app.task(bind=True) @app.task(bind=True)
def daily_imports(self): def daily_imports(self):
@ -230,10 +240,9 @@ def daily_imports(self):
logger.info("Everyday imports") logger.info("Everyday imports")
imports = RecurrentImport.objects.filter( imports = RecurrentImport.objects.filter(
recurrence=RecurrentImport.RECURRENCE.DAILY recurrence=RecurrentImport.RECURRENCE.DAILY
) ).order_by("pk")
for imp in imports: run_recurrent_imports_from_list([imp.pk for imp in imports])
run_recurrent_import.delay(imp.pk)
@app.task(bind=True) @app.task(bind=True)
@ -241,10 +250,9 @@ def run_all_recurrent_imports(self):
from agenda_culturel.models import RecurrentImport from agenda_culturel.models import RecurrentImport
logger.info("Run all imports") logger.info("Run all imports")
imports = RecurrentImport.objects.all() imports = RecurrentImport.objects.all().order_by("pk")
for imp in imports: run_recurrent_imports_from_list([imp.pk for imp in imports])
run_recurrent_import.delay(imp.pk)
@app.task(bind=True) @app.task(bind=True)
@ -252,11 +260,9 @@ def run_all_recurrent_imports_failed(self):
from agenda_culturel.models import RecurrentImport, BatchImportation from agenda_culturel.models import RecurrentImport, BatchImportation
logger.info("Run only failed imports") logger.info("Run only failed imports")
imports = RecurrentImport.objects.all() imports = RecurrentImport.objects.all().order_by("pk")
for imp in imports: run_recurrent_imports_from_list([imp.pk for imp in imports if imp.last_import().status == BatchImportation.STATUS.FAILED])
if imp.last_import().status == BatchImportation.STATUS.FAILED:
run_recurrent_import.delay(imp.pk)
@app.task(bind=True) @app.task(bind=True)
@ -266,10 +272,9 @@ def weekly_imports(self):
logger.info("Weekly imports") logger.info("Weekly imports")
imports = RecurrentImport.objects.filter( imports = RecurrentImport.objects.filter(
recurrence=RecurrentImport.RECURRENCE.WEEKLY recurrence=RecurrentImport.RECURRENCE.WEEKLY
) ).order_by("pk")
for imp in imports: run_recurrent_imports_from_list([imp.pk for imp in imports])
run_recurrent_import.delay(imp.pk)
@app.task(base=ChromiumTask, bind=True) @app.task(base=ChromiumTask, bind=True)
def import_events_from_url(self, url, cat): def import_events_from_url(self, url, cat):

View File

@ -44,7 +44,9 @@
<td> <td>
{% if obj.nb_imports > 0 %} {% if obj.nb_imports > 0 %}
{% with imp=obj.last_import %} {% with imp=obj.last_import %}
<span{% if imp.status == "failed" %} data-tooltip="{{ imp.error_message }}"{% endif %}>{{ imp.status }}</span> <span{% if imp.status == "failed" %} data-tooltip="{{ imp.error_message }}"{% endif %}>
{% if imp.status == "running" %}<em>{{ imp.status }}</em>{% else %}{{ imp.status }}{% endif %}
</span>
{% endwith %} {% endwith %}
{% endif %} {% endif %}
</td> </td>