Има опашки за грешки в celery

Има ли някакъв начин в celery, по който, ако изпълнението на задача е неуспешно, мога автоматично да я поставя в друга опашка.

Например, ако задачата се изпълнява в опашка x, при изключение я поставете в друга опашка с име error_x

Редактиране:

В момента използвам celery==3.0.13 заедно с django 1.4, Rabbitmq като брокер.

Понякога задачата се проваля. Има ли начин в celery да добавя съобщения към опашка за грешки и да ги обработва по-късно.

Проблемът, когато задачата за целина е неуспешна, е, че нямам достъп до името на опашката със съобщения. Така че не мога да използвам self.retry повторен опит, за да го поставя в друга опашка за грешки.


person Vignesh    schedule 24.07.2014    source източник


Отговори (2)


Е, не можете да използвате механизма retry, ако искате да насочите задачата към друга опашка. От документите:

retry() може да се използва за повторно изпълнение на задачата, например в случай на възстановими грешки.

Когато извикате повторен опит, той ще изпрати ново съобщение, използвайки същия идентификатор на задача, и ще се погрижи да се увери, че съобщението е доставено на същата опашка като първоначалната задача.

Ще трябва да рестартирате себе си и да го насочите ръчно към желаната от вас опашка в случай на повдигнато изключение. Изглежда добра работа за обратни извиквания за грешки.

Основният проблем е, че трябва да получим името на задачата в обратното извикване на грешката, за да можем да я стартираме. Също така може да не искаме да добавяме обратното извикване всеки път, когато стартираме задача. По този начин декораторът би бил добър начин за автоматично добавяне на правилното обратно извикване.

from functools import partial, wraps

import celery


@celery.shared_task
def error_callback(task_id, task_name, retry_queue, retry_routing_key):
    # We must retrieve the task object itself.
    # `tasks` is a dict of 'task_name': celery_task_object
    task = celery.current_app.tasks[task_name]
    # Re launch the task in specified queue.
    task.apply_async(queue=retry_queue, routing_key=retry_routing_key)


def retrying_task(retry_queue, retry_routing_key):
    """Decorates function to automatically add error callbacks."""
    def retrying_decorator(func):
        @celery.shared_task
        @wraps(func)  # just to keep the original task name
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        # Monkey patch the apply_async method to add the callback.
        wrapper.apply_async = partial(
            wrapper.apply_async,
            link_error=error_callback.s(wrapper.name, retry_queue, retry_routing_key)
        )
        return wrapper
    return retrying_decorator


# Usage:
@retrying_task(retry_queue='another_queue', retry_routing_key='another_routing_key')
def failing_task():
    print 'Hi, I will fail!'
    raise Exception("I'm failing!")

failing_task.apply_async()

Можете да настроите декоратора, за да предава каквито параметри имате нужда.

person Sébastien Deprez    schedule 25.07.2014
comment
Ако предам името на опашката също на error_callback, тогава това е донякъде работещо. - person Vignesh; 25.07.2014
comment
Актуализирах отговора, за да използвам декоратор, така че да не се налага да предавате името на задачата и да задавате обратното извикване. Добавянето на името на опашката също вече е лесно, тъй като е прост аргумент за декоратора. Ще го актуализирам след обяд! - person Sébastien Deprez; 25.07.2014

Имах подобен проблем и го реших може да не е по най-ефективния начин, но въпреки това моето решение е следното:

Създадох django модел, за да запазя всички мои идентификационни номера на задачите от целина и който може да проверява състоянието на задачата.

След това създадох друга задача за целина, която се изпълнява в безкраен цикъл и проверява всички задачи, които се изпълняват, за действителното им състояние и ако състоянието е „НЕУСПЕШНО“, просто го изпълнява отново. Всъщност не променям опашката за задачата, която стартирам отново, но мисля, че можете да приложите някаква персонализирана логика, за да решите къде да поставите всяка задача, която стартирате отново по този начин.

person canufeel    schedule 25.07.2014
comment
Имахме подобна логика и в нашата система, но всеки път, когато се създава или изпълнява задача, се прави заявка към базата данни. Освен това имаме голям брой малки задачи и масата растеше с много бързи темпове. Поради всичко това бих искал да избегна използването на база данни. - person Vignesh; 25.07.2014