Сесията на SQLAlchemy е изчистена в задачата celery и функцията on_success

Създавам инструмент, който извлича данни от различна база данни, трансформира ги и ги съхранява в моята собствена база данни. Мигрирам от APScheduler към Celery, но се натъкнах на следния проблем:

Използвам клас, който наричам JobRecords, за да съхраня кога е изпълнена задача, дали е била успешна и какви грешки е срещнала. Използвам това, за да знам, че не гледам много назад за актуализирани записи, особено след като някои таблици имат множество милиони редове.

Тъй като системата е една и съща за всички задачи, създадох подклас от обекта celery Task. Уверявам се, че заданието се изпълнява в контекста на приложението Flask и извличам последния път, когато това задание е приключило успешно. Също така се уверявам, че регистрирам стойност за now, за да избегна проблеми с времето между заявката в базата данни и добавянето на записа на заданието.

class RecordedTask(Task):
  """
  Task sublass that uses JobRecords to get the last run date
  and add new JobRecords on completion
  """
  now: datetime = None
  ignore_result = True

  _session: scoped_session = None
  success: bool = True
  info: dict = None

  @property
  def session(self) -> Session:
    """Making sure we have one global session instance"""
    if self._session is None:
      from app.extensions import db
      self._session = db.session
    return self._session

  def __call__(self, *args, **kwargs):
    from app.models import JobRecord

    kwargs['last_run'] = (
        self.session.query(func.max(JobRecord.run_at_))
        .filter(JobRecord.job_id == self.name, JobRecord.success)
        .first()
    )[0] or datetime.min
    self.now = kwargs['now'] = datetime.utcnow()

    with app.app_context():
      super(RecordedTask, self).__call__(*args, **kwargs)

  def on_failure(self, exc, task_id, args: list, kwargs: dict, einfo):
    self.session.rollback()
    self.success = False
    self.info = dict(
        args=args,
        kwargs=kwargs,
        error=exc.args,
        exc=format_exception(exc.__class__, exc, exc.__traceback__),
    )
    app.logger.error(f"Error executing job '{self.name}': {exc}")

  def on_success(self, retval, task_id, args: list, kwargs: dict):
    app.logger.info(f"Executed job '{self.name}' successfully, adding JobRecord")

    for entry in self.to_trigger:
      if len(entry) == 2:
        job, kwargs = entry
      else:
        job, = entry
        kwargs = {}
      app.logger.info(f"Scheduling job '{job}'")
      current_celery_app.signature(job, **kwargs).delay()

  def after_return(self, *args, **kwargs):
    from app.models import JobRecord
    record = JobRecord(
        job_id=self.name,
        run_at_=self.now,
        info=self.info,
        success=self.success
    )
    self.session.add(record)
    self.session.commit()
    self.session.remove()

Добавих пример за задача за актуализиране на модел, наречен Location, но има много задачи точно като тази.

@celery.task(bind=True, name="update_locations")
def update_locations(self, last_run: datetime = datetime.min, **_):
  """Get the locations from the external database and check for updates"""
  locations: List[ExternalLocation] = ExternalLocation.query.filter(
      ExternalLocation.updated_at_ >= last_run
  ).order_by(ExternalLocation.id).all()

  app.logger.info(f"ExternalLocation: collected {len(locations)} updated locations")
  for update_location in locations:
    existing_location: Location = Location.query.filter(
        Location.external_id == update_location.id
    ).first()

    if existing_location is None:
      self.session.add(Location.from_worker(update_location))
    else:
      existing_location.update_from_worker(update_location)

Проблемът е, че когато стартирам тази задача, обектите Location не са ангажирани с JobRecord, така че се създава само последният. Ако го проследя с дебъгера, Location.query.count() връща правилната стойност във функцията, но веднага щом влезе в обратното извикване on_success, се връща на 0, а self._session.new връща празен dict.

Вече се опитах да добавя сесията като свойство, за да се уверя, че е един и същ екземпляр навсякъде, но проблемът продължава да съществува. Може би има нещо общо с това, че е scoped_session заради Flask-SQLAlchemy?

Съжалявам за голямото количество код, опитах се да премахна възможно най-много. Всяка помощ е добре дошла!


person Ruben Helsloot    schedule 28.06.2019    source източник


Отговори (1)


Разбрах, че виновникът е комбинацията от scoped_session и контекста на приложението Flask. Както всеки контекстен мениджър, изпълнението на кода with app.app_context() задейства функцията __exit__ при напускане, което на свой ред доведе до изчистване на ScopedRegistry, където се съхранява scoped_session. След това беше създадена нова сесия, към нея бяха добавени JobRecords и тази сесия беше ангажирана. Следователно местоположенията няма да бъдат записани в базата данни.

Има две възможни решения. Ако не използвате сесии в други файлове освен във вашата задача, можете да добавите свойство на сесия към задачата. По този начин избягвате напълно scoped_session и можете да изчистите във вашата after_return функция.

 @property 
 def session(self):
   if self._session is None:
      from dashboard.extensions import db
      self._session = db.create_session(options={})()
    return self._session

Въпреки това имах достъп до сесията и в моите файлове с дефиниция на модела чрез from extensions import db. Следователно използвах две различни сесии. В крайна сметка използвах app.app_context().push() вместо contextmanager, като по този начин избегнах функцията __exit__

  app.app_context().push()
  super(RecordedTask, self).__call__(*args, **kwargs)
person Ruben Helsloot    schedule 30.06.2019