Связь SQLAlchemy с таблицей task_instance в воздушном потоке

Я использую airflow и хочу иметь возможность отслеживать все файлы, созданные заданными экземплярами задач, в таблице airflow.file_list, которая является частью той же базы данных, которая используется airflow (работает на postgres). Используя SQLAlchemy, у меня есть следующий преобразователь для моей таблицы file_list:

from airflow.models import Base

class MySourceFile(Base):
    """ SQLAlchemy mapper class for the file_list table entries."""
    __table__ = Table('file_list', Base.metadata,
        Column('UID', Integer, primary_key=True),
        Column('task_id', String(_ID_LEN), nullable=False),
        Column('dag_id', String(_ID_LEN), nullable=False),
        Column('execution_date', DateTime, nullable=False),
        Column('file_path', String(_ID_LEN), nullable=False),
        Column('file_sha256', String(_ID_LEN), nullable=False),
        ForeignKeyConstraint(
            ['task_id', 'dag_id', 'execution_date'],
            ['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date']
        ),
        extend_existing=True,
    )

    instance_task = relationship(
    TaskInstance,
    primaryjoin=and_(
        TaskInstance.task_id == __table__.c.task_id,
        TaskInstance.dag_id == __table__.c.dag_id,
        TaskInstance.execution_date == __table__.c.execution_date
    ),
    viewonly=True,
    foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date]
)

Я импортирую декларативную базу из airflow.modles, потому что я читал, что для взаимодействующих картографов необходимо использовать один и тот же экземпляр базы. В приведенном выше фрагменте кода я хочу, чтобы instance_task ссылался на экземпляр task_instance, создавший файл. Столбцы таблицы task_id, dag_id и execution_date в моей таблице airflow.file_list отражают первичные ключи в airflow.task_instance. К сожалению, когда я запускаю сервер воздушного потока, я получаю следующую ошибку:

sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper|MySourceFile|file_list'. Original exception was: Can't determine relationship direction for relationship 'MySourceFile.instance_task' - foreign key columns are present in neither the parent nor the child's mapped tables

Я бы предпочел не изменять источник воздушного потока, если это возможно. Спасибо заранее за любую помощь.


person AdvancedGarde89    schedule 04.10.2017    source источник
comment
Это не тот ответ, который вы ищете, но с установкой воздушного потока и использованием вашей модели все в порядке. Я бы убедился, что используемая таблица действительно имеет ограничение fk. Вы используете extend_existing, поэтому вы должны быть охвачены, даже если таблица существует в метаданных до этого определения и без fk, но сделайте двойную проверку.   -  person Ilja Everilä    schedule 04.10.2017


Ответы (1)


Во-первых, указание __table__ таким образом является странным (я никогда раньше этого не видел), и, как правило, все, что начинается с символа подчеркивания, является приватным, и его следует избегать.

Вам нужно указать, что task_id является внешним ключом. Что-то вроде этого:

task_id = Column(String, ForeignKey('task.id'))

Если вы не их. А шаблон backref может быть полезен для создания отношения из экземпляра задачи. вплоть до вашего пользовательского файла без необходимости изменять модель TaskInstance.

person Ash Berlin-Taylor    schedule 04.10.2017
comment
Указание столбцов/таблиц так, как это сделал я, называется Гибридный подход в документации. Ранее я использовал декларативный подход, который вы предлагаете, но безуспешно. - person AdvancedGarde89; 04.10.2017
comment
Ах да. Тем не менее, вы должны быть в состоянии передать ForeignKey(task_instance.id) вызову Column() в этом режиме. - person Ash Berlin-Taylor; 04.10.2017
comment
Ой. Я полностью пропустил, что это был составной внешний ключ. - person Ash Berlin-Taylor; 04.10.2017