Я использую airflow и хочу иметь возможность отслеживать все файлы, созданные заданными экземплярами задач, в таблице airflow.file_list
, которая является частью той же базы данных, которая используется airflow (работает на postgres). Используя SQLAlchemy, у меня есть следующий преобразователь для моей таблицы file_list
:
from airflow.models import Base
class MySourceFile(Base):
""" SQLAlchemy mapper class for the file_list table entries."""
__table__ = Table('file_list', Base.metadata,
Column('UID', Integer, primary_key=True),
Column('task_id', String(_ID_LEN), nullable=False),
Column('dag_id', String(_ID_LEN), nullable=False),
Column('execution_date', DateTime, nullable=False),
Column('file_path', String(_ID_LEN), nullable=False),
Column('file_sha256', String(_ID_LEN), nullable=False),
ForeignKeyConstraint(
['task_id', 'dag_id', 'execution_date'],
['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date']
),
extend_existing=True,
)
instance_task = relationship(
TaskInstance,
primaryjoin=and_(
TaskInstance.task_id == __table__.c.task_id,
TaskInstance.dag_id == __table__.c.dag_id,
TaskInstance.execution_date == __table__.c.execution_date
),
viewonly=True,
foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date]
)
Я импортирую декларативную базу из airflow.modles
, потому что я читал, что для взаимодействующих картографов необходимо использовать один и тот же экземпляр базы. В приведенном выше фрагменте кода я хочу, чтобы instance_task
ссылался на экземпляр task_instance, создавший файл. Столбцы таблицы task_id
, dag_id
и execution_date
в моей таблице airflow.file_list
отражают первичные ключи в airflow.task_instance
. К сожалению, когда я запускаю сервер воздушного потока, я получаю следующую ошибку:
sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper|MySourceFile|file_list'. Original exception was: Can't determine relationship direction for relationship 'MySourceFile.instance_task' - foreign key columns are present in neither the parent nor the child's mapped tables
Я бы предпочел не изменять источник воздушного потока, если это возможно. Спасибо заранее за любую помощь.
extend_existing
, поэтому вы должны быть охвачены, даже если таблица существует в метаданных до этого определения и без fk, но сделайте двойную проверку. - person Ilja Everilä   schedule 04.10.2017