Как установить равный приоритет_вес для задачи, которая зависит от другой задачи

У меня 8 комплектов задач. Каждый набор представляет собой серию задач: задача1 >> задача2 >> задача3. task3 зависит от task2, так как task2 зависит от task1.

Моя проблема в том, что задача 2 никогда не запускается, пока не будут завершены все задачи 1. Итак, для запуска set1.task2 он должен сначала запустить set8.task1.

Мое первоначальное исследование касается того, что priority_weight может быть включено в default_args для группы DAG. Я узнал, что задача task1 будет иметь более высокий приоритет для подчиненных.

Есть ли способ, при котором все веса приоритета могут быть одинаковыми. Таким образом, set1.task2 уже может запускаться независимо от set2,3 и т. Д., Поскольку он просто зависит от set1.task1.

Спасибо!


person Nikko    schedule 21.05.2019    source источник


Ответы (1)


Установка weight_rule на «восходящий поток» или «абсолютный» должна помочь. Это из BaseOperator строки документации:

:param weight_rule: weighting method used for the effective total
    priority weight of the task. Options are:
    ``{ downstream | upstream | absolute }`` default is ``downstream``
    When set to ``downstream`` the effective weight of the task is the
    aggregate sum of all downstream descendants. As a result, upstream
    tasks will have higher weight and will be scheduled more aggressively
    when using positive weight values. This is useful when you have
    multiple dag run instances and desire to have all upstream tasks to
    complete for all runs before each dag can continue processing
    downstream tasks. When set to ``upstream`` the effective weight is the
    aggregate sum of all upstream ancestors. This is the opposite where
    downtream tasks have higher weight and will be scheduled more
    aggressively when using positive weight values. This is useful when you
    have multiple dag run instances and prefer to have each dag complete
    before starting upstream tasks of other dags.  When set to
    ``absolute``, the effective weight is the exact ``priority_weight``
    specified without additional weighting. You may want to do this when
    you know exactly what priority weight each task should have.
    Additionally, when set to ``absolute``, there is bonus effect of
    significantly speeding up the task creation process as for very large
    DAGS. Options can be set as string or using the constants defined in
    the static class ``airflow.utils.WeightRule``

Ссылка: https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py#L129-L150

person bosnjak    schedule 21.05.2019
comment
где вы размещаете этот аргумент? в аргументах по умолчанию группы DAG? - person Nikko; 22.05.2019
comment
Спасибо! Я ставлю это внутри каждой задачи. Я не уверен, может ли это быть аргументом по умолчанию в DAG. - person Nikko; 22.05.2019
comment
Я верю, что может, попробуйте. - person bosnjak; 22.05.2019