Потеря/незапуск таска в Celery (Django)

Рейтинг: 0Ответов: 0Опубликовано: 15.02.2023

Всем привет!

В проекте на Django есть одна модель Table (по сути - таблица), в зависимости от значения поля task_type при создании записи в таблице запускается один из двух обработчиков (PeriodicTask) и планирует запуск таска (@shared_task) с набором входных параметров, причем время запуска тасков date_of_start очень часто является одинаковым для различных экземпляров @shared_task.

При просмотре выполнения этих тасков в Flower я вижу, что часть тасков просто не запускаются, а в джанговской таблице django_celery_beat_periodictask видно время, когда они якобы запускаются. Очевидно, что результата запуска и выполнения этих тасков тоже нет. Запуск всех тасков и успешное выполнение с явным результатом критичны, потому что связаны со сбором оплаты за услуги и с временем выполнения тасков

Структура кода следующая:

class Table(models.Model):
    class Meta:
        db_table = "table"

    task_type = models.IntegerField(choices=DumpTaskTypeChoice.choices, default=DumpTaskTypeChoice.lock)
    date_of_start = models.DateTimeField(verbose_name='Zaplanowana data')
    input_data = models.JSONField(default=default_input_data)
    is_succeded = models.IntegerField(choices=TaskStatusChoice.choices, default=TaskStatusChoice.undefined)

    objects = TableManager()


class TableManager(models.Manager):
    def create(self, *args, **kwargs):
        result = super().create(*args, **kwargs)
        parsing_workers(result)
        return result


def dupms_parsing_workers(result):
    if result.task_type == 1:
        worker_1(record=result.pk)
    if result.task_type == 2:
        worker_2(record=result.pk)

Обработчики имеют схожий код, поэтому код на них общий:

from django_celery_beat.models import PeriodicTask

def worker_i(**kwargs):
    from App.models import Table

    record_id = kwargs.get('record', None)
    table_record = None if not record_id else Table.objects.filter(pk=record_id)
    if not table_record or not timetable_record.exists():
        return
    worker_datetime = table_record.date_of_start

    ### Вычисление набора входных параметров full_set для таска ###

    return PeriodicTask.objects.create(**{
        "task": "task_i",
        "name": f'name {random.random()}',
        "clocked": ClockedSchedule.objects.create(clocked_time=worker_datetime),
        "enabled": True,
        "kwargs": full_set,
        "one_off": True
    })



@shared_task(bind=True, default_retry_delay=30, max_retries=50)
def task_i(*args, **kwargs):
    ### обработка данных

Брокером является RabbitMQ Server¶. В какие настройки брокера и Celery нужно смотреть, чтобы одновременно могли выполняться по крайней мере 50-100 тасков с одним и тем же временем запуска date_of_start?

Ответы

Ответов пока нет.