from my_celery import app
from delete_duplicate import DeleteDuplicate
import celery
class CallbackTask(celery.Task):
def on_success(self, retval, task_id, args, kwargs):
print "Task id: ", task_id
print "retval ", retval
print "Args ", args
print "kwargs: ", kwargs
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
class Ott(object):
@app.task(base=CallbackTask())
def delete_duplicate_ott():
print "Inside ott.... "
DeleteDuplicate().delete_duplicate_ott()
@app.task(base=CallbackTask())
def delete_duplicate_noti():
print "Inside noti.... "
DeleteDuplicate().delete_duplicate_noti()
Я использовал обратный вызов в celery для реализации вышеуказанного обратного вызова.
Код выше работает отлично. Я хочу передать параметры в обратных вызовах on_success() и on_failure() в args или kwargs. Args и Kwargs печатают None после успешного выполнения задачи.
Я ищу в Интернете, не нахожу решения, которое следует за этим обратным вызовом с использованием наследования (подкласс).
Любая помощь высоко ценится. Спасибо!
@app.task(bound=True, base=CallbackTask())
, установив атрибут внутри своей задачиself.on_success_args = ...
, а затем вCallbackTask.on_success
проверив эти переменные. Но это был бы не лучший подход - person Patricio   schedule 05.06.2019