У меня есть основной даг, который извлекает файл и разбивает данные в этом файле на отдельные файлы csv. У меня есть другой набор задач, которые необходимо выполнить для каждого файла из этих файлов csv. например (загрузка в GCS, вставка в BigQuery) Как я могу динамически сгенерировать SubDag для каждого файла в зависимости от количества файлов? SubDag будет определять такие задачи, как загрузка в GCS, вставка в BigQuery, удаление файла csv)
Так вот как это выглядит сейчас
main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...) # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files
def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
...
...
Как я могу вызвать subdag_factory для каждого файла, созданного в transform_operator?