Celery Создание групповых задач из цепочки задач

Я пытаюсь связать следующие задачи с сельдереем (v4.0),

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s()
result = task.get()

Вышеупомянутая часть работает нормально до generate_job_requests как аккорд. Но проблема начинается с execute_job, откуда он получает список заданий от generate_job_requests, для которых мне нужно создавать параллельные задачи, а затем - совокупный результат всех заданий.

Я пытаюсь проверить, возможен ли такой график задач с сельдереем? Есть ли какой-либо альтернативный рабочий процесс для решения проблемы с такой зависимостью? Все, что мне не хватает в документации.


person Sanket Sudake    schedule 13.11.2016    source источник


Ответы (1)


Я использовал функциональность типа карты с создателем промежуточных задач, который действует как аккорд,

@shared_task(ignore_result=False)
def dmap(it, callback, end_task):
    callback = subtask(callback)
    grp = group(callback.clone([arg, ]) for arg in it)
    c = (grp | end_task)
    return c()

Таким образом, поток задач был сокращен как это,

task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(
        execute_job.s(), aggregate_result.s())).apply_async()

Чтобы получить максимальную отдачу от задачи, я сделал несколько настроек,

# As we get dmap task id here
dmap_task = celery_app.AsyncResult(task.id)
dmap_result = dmap_task.get()
# Get actual aggregate_result task id
aggr_res_task_id = dmap_result[0][0]
result = celery_app.AsyncResult(aggr_res_task_id)
# Here we receive actual output of overall task
result.get()

Я сослался на ответ

person Sanket Sudake    schedule 20.11.2016