Потеря/незапуск таска в Celery (Django)
Всем привет!
В проекте на Django есть одна модель Table
(по сути - таблица), в зависимости от значения поля task_type
при создании записи в таблице запускается один из двух обработчиков (PeriodicTask
) и планирует запуск таска (@shared_task) с набором входных параметров, причем время запуска тасков date_of_start
очень часто является одинаковым для различных экземпляров @shared_task.
При просмотре выполнения этих тасков в Flower я вижу, что часть тасков просто не запускаются, а в джанговской таблице django_celery_beat_periodictask
видно время, когда они якобы запускаются. Очевидно, что результата запуска и выполнения этих тасков тоже нет. Запуск всех тасков и успешное выполнение с явным результатом критичны, потому что связаны со сбором оплаты за услуги и с временем выполнения тасков
Структура кода следующая:
class Table(models.Model):
class Meta:
db_table = "table"
task_type = models.IntegerField(choices=DumpTaskTypeChoice.choices, default=DumpTaskTypeChoice.lock)
date_of_start = models.DateTimeField(verbose_name='Zaplanowana data')
input_data = models.JSONField(default=default_input_data)
is_succeded = models.IntegerField(choices=TaskStatusChoice.choices, default=TaskStatusChoice.undefined)
objects = TableManager()
class TableManager(models.Manager):
def create(self, *args, **kwargs):
result = super().create(*args, **kwargs)
parsing_workers(result)
return result
def dupms_parsing_workers(result):
if result.task_type == 1:
worker_1(record=result.pk)
if result.task_type == 2:
worker_2(record=result.pk)
Обработчики имеют схожий код, поэтому код на них общий:
from django_celery_beat.models import PeriodicTask
def worker_i(**kwargs):
from App.models import Table
record_id = kwargs.get('record', None)
table_record = None if not record_id else Table.objects.filter(pk=record_id)
if not table_record or not timetable_record.exists():
return
worker_datetime = table_record.date_of_start
### Вычисление набора входных параметров full_set для таска ###
return PeriodicTask.objects.create(**{
"task": "task_i",
"name": f'name {random.random()}',
"clocked": ClockedSchedule.objects.create(clocked_time=worker_datetime),
"enabled": True,
"kwargs": full_set,
"one_off": True
})
@shared_task(bind=True, default_retry_delay=30, max_retries=50)
def task_i(*args, **kwargs):
### обработка данных
Брокером является RabbitMQ Server¶. В какие настройки брокера и Celery нужно смотреть, чтобы одновременно могли выполняться по крайней мере 50-100 тасков с одним и тем же временем запуска date_of_start
?