Наличие очередей ошибок в сельдерее

Есть ли в сельдерее способ, с помощью которого, если выполнение задачи не удается, я могу автоматически поместить ее в другую очередь.

Например, задача выполняется в очереди x, в случае исключения ставится в очередь в другую очередь с именем error_x.

Редактировать:

В настоящее время я использую celery==3.0.13 вместе с django 1.4, Rabbitmq в качестве брокера.

Иногда задача не выполняется. Есть ли способ в сельдерее добавлять сообщения в очередь ошибок и обрабатывать их позже.

Проблема при сбое задачи сельдерея заключается в том, что у меня нет доступа к имени очереди сообщений. Поэтому я не могу использовать self.retry повторную попытку, чтобы поместить ее в другую очередь ошибок.


person Vignesh    schedule 24.07.2014    source источник


Ответы (2)


Ну, вы не можете использовать механизм retry, если хотите перенаправить задачу в другую очередь. Из документов:

retry() можно использовать для повторного выполнения задачи, например, в случае исправимых ошибок.

Когда вы вызываете повторную попытку, он отправляет новое сообщение, используя тот же идентификатор задачи, и позаботится о том, чтобы сообщение было доставлено в ту же очередь, что и исходная задача.

Вам придется перезапустить себя и вручную направить его в желаемую очередь в случае возникновения любого исключения. Кажется, это хорошая работа для обратных вызовов ошибок.

Основная проблема заключается в том, что нам нужно получить имя задачи в обратном вызове ошибки, чтобы иметь возможность запустить ее. Также мы можем не захотеть добавлять обратный вызов каждый раз, когда запускаем задачу. Таким образом, декоратор был бы хорошим способом автоматически добавить правильный обратный вызов.

from functools import partial, wraps

import celery


@celery.shared_task
def error_callback(task_id, task_name, retry_queue, retry_routing_key):
    # We must retrieve the task object itself.
    # `tasks` is a dict of 'task_name': celery_task_object
    task = celery.current_app.tasks[task_name]
    # Re launch the task in specified queue.
    task.apply_async(queue=retry_queue, routing_key=retry_routing_key)


def retrying_task(retry_queue, retry_routing_key):
    """Decorates function to automatically add error callbacks."""
    def retrying_decorator(func):
        @celery.shared_task
        @wraps(func)  # just to keep the original task name
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        # Monkey patch the apply_async method to add the callback.
        wrapper.apply_async = partial(
            wrapper.apply_async,
            link_error=error_callback.s(wrapper.name, retry_queue, retry_routing_key)
        )
        return wrapper
    return retrying_decorator


# Usage:
@retrying_task(retry_queue='another_queue', retry_routing_key='another_routing_key')
def failing_task():
    print 'Hi, I will fail!'
    raise Exception("I'm failing!")

failing_task.apply_async()

Вы можете настроить декоратор, чтобы передать любые параметры, которые вам нужны.

person Sébastien Deprez    schedule 25.07.2014
comment
Если я передам имя очереди также error_callback, то это несколько сработает. - person Vignesh; 25.07.2014
comment
Я обновил ответ, чтобы использовать декоратор, поэтому вам не нужно передавать имя задачи и устанавливать обратный вызов. Добавить имя очереди теперь также легко, так как это простой аргумент для декоратора. После обеда обновлю! - person Sébastien Deprez; 25.07.2014

У меня была аналогичная проблема, и я решил ее, возможно, не самым эффективным способом, но мое решение выглядит следующим образом:

Я создал модель django, чтобы сохранить все идентификаторы задач сельдерея, и она способна проверять состояние задачи.

Затем я создал еще одну задачу сельдерея, которая выполняется в бесконечном цикле и проверяет все задачи, которые «РАБОТАЮТ», в их фактическом состоянии, и, если состояние «НЕУДАЧНО», оно просто перезапускает его. На самом деле я не меняю очередь для задачи, которую я перезапускаю, но я думаю, что вы можете реализовать некоторую пользовательскую логику, чтобы решить, куда поместить каждую задачу, которую вы запускаете таким образом.

person canufeel    schedule 25.07.2014
comment
В нашей системе тоже была похожая логика, но каждый раз, когда задача создается или выполняется, делается запрос к базе данных. Также у нас было большое количество мелких задач, и таблица росла очень быстро. Из-за всего этого я хотел бы избежать использования базы данных. - person Vignesh; 25.07.2014