Как я могу захватить все записи журнала Python, созданные во время выполнения ряда задач Celery?

Я хочу преобразовать свою доморощенную систему очереди задач в очередь задач на основе Celery, но одна функция, которая у меня есть в настоящее время, вызывает у меня некоторые затруднения.

Прямо сейчас моя очередь задач работает очень грубо; Я запускаю задание (которое генерирует данные и загружает их на другой сервер), собираю журналы, используя вариант библиотеки захвата журналов Nose, а затем сохраняю журналы для задачи в виде подробной записи результатов в базе данных приложения.

Я хотел бы разбить это на три задачи:

  1. собирать данные
  2. загрузить данные
  3. отчет о результатах (включая все журналы двух предыдущих задач)

Настоящим кикером здесь является сбор журналов. Прямо сейчас, используя захват журнала, у меня есть серия записей журнала для каждого вызова журнала, сделанного в процессе создания и загрузки данных. Они необходимы для диагностических целей. Учитывая, что выполнение задач даже не гарантируется в одном и том же процессе, неясно, как я буду выполнять это в очереди задач Celery.

Мое идеальное решение этой проблемы будет тривиальным и в идеале минимально инвазивным методом сбора всех журналов во время предшествующих задач (1, 2) и предоставления их для задачи репортера (3)

Не лучше ли мне оставаться довольно грубым с определением задачи и поместить всю эту работу в одну задачу? или есть способ передать существующие захваченные журналы, чтобы собрать их в конце?


person Chris R    schedule 11.11.2010    source источник


Ответы (3)


Я предполагаю, что вы используете модуль logging. Вы можете использовать отдельный именованный регистратор для каждого набора задач для выполнения этой работы. Они наследуют всю конфигурацию с верхнего уровня.

in task.py:

import logging

@task
step1(*args, **kwargs):
    # `key` is some unique identifier common for a piece of data in all steps of processing
    logger = logging.getLogger("myapp.tasks.processing.%s"%key)
    # ...
    logger.info(...) # log something

@task
step2(*args, **kwargs):
    logger = logging.getLogger("myapp.tasks.processing.%s"%key)
    # ...
    logger.info(...) # log something

Здесь все записи были отправлены в регистратор с одним и тем же именем. Теперь вы можете использовать 2 подхода для получения этих записей:

  1. Настройте прослушиватель файлов с именем, которое зависит от имени регистратора. После последнего шага просто прочитайте всю информацию из этого файла. Убедитесь, что для этого прослушивателя отключена буферизация вывода, иначе вы рискуете потерять записи.

  2. Создайте собственный слушатель, который будет накапливать записи в памяти, а затем возвращать их все, когда об этом будет сказано. Я бы использовал memcached для хранения здесь, это проще, чем создавать собственное хранилище между процессами.

person Alexander Lebedev    schedule 04.01.2011

Похоже, что какой-то «наблюдатель» был бы идеальным. Если вы можете просматривать и потреблять журналы в виде потока, вы можете получать результаты по мере их поступления. Поскольку наблюдатель будет работать отдельно и, следовательно, не будет зависеть от того, что он наблюдает, я полагаю, что это удовлетворит ваши требования для не - инвазивное решение.

person Lloyd Moore    schedule 14.12.2010

Django Sentry — это утилита ведения журнала для Python (и Django), которая поддерживает Celery.

person Brendon Crawford    schedule 23.12.2010