Сеанс SQLAlchemy очищен в задании celery и функции on_success

Я создаю инструмент, который извлекает данные из другой базы данных, преобразует их и сохраняет в моей собственной базе данных. Я перехожу с APScheduler на Celery, но столкнулся со следующей проблемой:

Я использую класс, который я называю JobRecords, чтобы хранить информацию о том, когда задание выполнялось, было ли оно успешным и какие ошибки возникали. Я использую это, чтобы не заглядывать слишком далеко назад в поисках обновленных записей, тем более что некоторые таблицы содержат несколько миллионов строк.

Поскольку система одинакова для всех вакансий, я создал подкласс из объекта celery Task. Я удостоверяюсь, что задание выполняется в контексте приложения Flask, и получаю последний раз, когда это задание было успешно завершено. Я также обязательно регистрирую значение для now, чтобы избежать проблем с синхронизацией между запросом к базе данных и добавлением записи о задании.

class RecordedTask(Task):
  """
  Task sublass that uses JobRecords to get the last run date
  and add new JobRecords on completion
  """
  now: datetime = None
  ignore_result = True

  _session: scoped_session = None
  success: bool = True
  info: dict = None

  @property
  def session(self) -> Session:
    """Making sure we have one global session instance"""
    if self._session is None:
      from app.extensions import db
      self._session = db.session
    return self._session

  def __call__(self, *args, **kwargs):
    from app.models import JobRecord

    kwargs['last_run'] = (
        self.session.query(func.max(JobRecord.run_at_))
        .filter(JobRecord.job_id == self.name, JobRecord.success)
        .first()
    )[0] or datetime.min
    self.now = kwargs['now'] = datetime.utcnow()

    with app.app_context():
      super(RecordedTask, self).__call__(*args, **kwargs)

  def on_failure(self, exc, task_id, args: list, kwargs: dict, einfo):
    self.session.rollback()
    self.success = False
    self.info = dict(
        args=args,
        kwargs=kwargs,
        error=exc.args,
        exc=format_exception(exc.__class__, exc, exc.__traceback__),
    )
    app.logger.error(f"Error executing job '{self.name}': {exc}")

  def on_success(self, retval, task_id, args: list, kwargs: dict):
    app.logger.info(f"Executed job '{self.name}' successfully, adding JobRecord")

    for entry in self.to_trigger:
      if len(entry) == 2:
        job, kwargs = entry
      else:
        job, = entry
        kwargs = {}
      app.logger.info(f"Scheduling job '{job}'")
      current_celery_app.signature(job, **kwargs).delay()

  def after_return(self, *args, **kwargs):
    from app.models import JobRecord
    record = JobRecord(
        job_id=self.name,
        run_at_=self.now,
        info=self.info,
        success=self.success
    )
    self.session.add(record)
    self.session.commit()
    self.session.remove()

Я добавил пример задания на обновление модели под названием Location, но подобных заданий много.

@celery.task(bind=True, name="update_locations")
def update_locations(self, last_run: datetime = datetime.min, **_):
  """Get the locations from the external database and check for updates"""
  locations: List[ExternalLocation] = ExternalLocation.query.filter(
      ExternalLocation.updated_at_ >= last_run
  ).order_by(ExternalLocation.id).all()

  app.logger.info(f"ExternalLocation: collected {len(locations)} updated locations")
  for update_location in locations:
    existing_location: Location = Location.query.filter(
        Location.external_id == update_location.id
    ).first()

    if existing_location is None:
      self.session.add(Location.from_worker(update_location))
    else:
      existing_location.update_from_worker(update_location)

Проблема в том, что когда я запускаю это задание, объекты Location не фиксируются с JobRecord, поэтому создаются только последние. Если я отслеживаю это с помощью отладчика, Location.query.count() возвращает правильное значение внутри функции, но как только он входит в обратный вызов on_success, он возвращается к 0, а self._session.new возвращает пустой словарь.

Я уже пытался добавить сеанс в качестве свойства, чтобы убедиться, что он везде один и тот же экземпляр, но проблема все еще сохраняется. Может быть, это как-то связано с тем, что он scoped_session из-за Flask-SQLAlchemy?

Извините за большое количество кода, я постарался убрать как можно больше. Любая помощь приветствуется!


person Ruben Helsloot    schedule 28.06.2019    source источник


Ответы (1)


Я обнаружил, что виновником была комбинация scoped_session и контекста приложения Flask. Как и любой контекстный менеджер, запуск кода with app.app_context() запускал функцию __exit__ при выходе, что, в свою очередь, приводило к очистке ScopedRegistry, где хранился scoped_session. Затем был создан новый сеанс, к нему были добавлены JobRecords, и этот сеанс был зафиксирован. Поэтому местоположения не будут записаны в базу данных.

Есть два возможных решения. Если вы не используете сеансы в других файлах, кроме вашей задачи, вы можете добавить к задаче свойство сеанса. Таким образом, вы полностью избегаете scoped_session и можете очистить свою функцию after_return.

 @property 
 def session(self):
   if self._session is None:
      from dashboard.extensions import db
      self._session = db.create_session(options={})()
    return self._session

Однако я также обращался к сеансу в файлах определения моей модели через from extensions import db. Поэтому я использовал два разных сеанса. В итоге я использовал app.app_context().push() вместо contextmanager, таким образом избегая функции __exit__

  app.app_context().push()
  super(RecordedTask, self).__call__(*args, **kwargs)
person Ruben Helsloot    schedule 30.06.2019