Flask-SocketIO — Как генерировать событие из подпроцесса

У меня есть приложение Flask, которое при определенном вызове отдыха запускает несколько модулей с использованием ProcessPoolExecutor.

ОБНОВЛЕНО: добавлен Redis в качестве очереди сообщений (с использованием Docker, Redis в качестве хоста Redis)

socketio = SocketIO(app, message_queue='redis://redis')

(...)

def emit_event(evt, message):
    socketio.emit(evt, message, namespace='/test')

@app.route('/info', methods=['GET'])
def info():
    emit_event('update_reports', '')

(...)
if __name__ == "__main__":
    socketio.run(host='0.0.0.0', threaded=True)

Теперь, когда я добавил Redis, он по-прежнему работает при отправке из основного приложения. Вот некоторые из кода, в котором я запускаю подпроцесс:

def __init__(self):
    self.executor = futures.ProcessPoolExecutor(max_workers=4)
    self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')

    (...)
    future = self.executor.submit(process, params)
    future.add_done_callback(functools.partial(self.finished_callback, pid))

Затем в этом обратном вызове я вызываю метод emit_event:

def finished_callback(self, pid, future):
    pid.status = Status.DONE.value
    pid.finished_at = datetime.datetime.utcnow
    pid.save()

    self.socketio.emit('update_reports', 'done', namespace='/test')

Получение и отправка/отправка сообщений от/к клиенту с моего контроллера работает нормально, также, если я вызываю /info из curl или postman, мой клиент получает сообщение, но при попытке отправить событие таким же образом из этого обратного вызова подпроцесса, теперь он показывает эту ошибку:

Это в основном для уведомлений, таких как уведомление о завершении длительного процесса и тому подобное.

INFO:socketio:emitting event "update_reports" to all [/test] ERROR:socketio:Cannot publish to redis... retrying ERROR:socketio:Cannot publish to redis... giving up

Что я делаю неправильно?

Спасибо!


person magnoz    schedule 30.08.2018    source источник
comment
Можете ли вы добавить код, который вы используете для генерации из вашего подпроцесса? Также вы упомянули маршрут /info, но в вашем примере есть /test. Я предполагаю, что это маршрут, который вы вызываете из завитка?   -  person Miguel    schedule 31.08.2018
comment
спасибо за ваш ответ, Мигель, в подпроцессе я просто импортирую метод emit_event из основного файла py, в котором он находится, и использую его так же, как я использую его в этом вызове /test. Да, это URL, который я вызываю. И в результате из подпроцесса никогда не приходит событие/сообщение (и в консоли нет ошибки)   -  person magnoz    schedule 02.09.2018
comment
@Miguel Мигель, я только что добавил еще немного кода, дайте мне знать, что вы думаете   -  person magnoz    schedule 02.09.2018


Ответы (1)


Существуют определенные правила, которым необходимо следовать при настройке расширения Flask-SocketIO, чтобы внешние процессы могли генерировать сообщения, в том числе использование очереди сообщений, которую основной и внешний процессы используют для координации усилий. См. Отправка из внешнего процесса. раздел документации для инструкций.

person Miguel    schedule 03.09.2018
comment
Спасибо за Ваш ответ. Я только что попробовал запустить то, что там написано из моего подпроцесса, однако это то, что я получаю в журналах Cannot publish to redis... retrying, giving up - person magnoz; 03.09.2018
comment
Я только что обновил вопрос с более подробной информацией и кодом. - person magnoz; 03.09.2018
comment
Что ж, кажется, ваши подпроцессы не подключаются к Redis. Можете ли вы попробовать написать простой скрипт, который подключается к очереди сообщений и отправляет сообщения? Если это работает, вам нужно выяснить, что в вашем дочернем процессе влияет на подключение к Redis. - person Miguel; 04.09.2018
comment
Правильно, и я исправил это сразу. Также вверху добавлена ​​вещь eventlet.monkey_patch(). И теперь событие приходит из основного потока, но из подпроцесса я получаю другую ошибку: AttributeError: module 'select' has no attribute 'poll', (и, конечно же, сообщение также не приходит, потому что оно использовалось ранее) есть идеи? - person magnoz; 04.09.2018
comment
Подпроцессы не должны использовать eventlet, только основной процесс. Кажется, что-то, что вы делаете в своих подпроцессах, несовместимо с eventlet, поэтому лучше не используйте его там. Вы можете заставить Socket.IO не использовать eventlet, передав async_mode='threading' в конструкторе класса SocketIO и удалив все исправления обезьян. - person Miguel; 05.09.2018
comment
Я так и сделал и безуспешно удалил файл monkey_patch из класса подпроцесса. Однако я попытался удалить функции monkey_patching и redis из основного процесса, и подпроцесс снова заработал. --На самом деле-- подпроцесс работал, проблема заключалась в том, что он ломался здесь future.add_done_callback(functools.partial(self.finished_callback, pid))' та же ошибка и обратный вызов не вызывался. - person magnoz; 05.09.2018
comment
Кстати, этот подпроцесс вызывается из другого класса, который используется в том же приложении фляги, может быть, это как-то влияет? Я ценю вашу помощь. - person magnoz; 05.09.2018
comment
Каким-то образом похоже, что ProcessPoolExecutor не нравится это исправление обезьяны, хотя я сделал monkey_patch в основном классе; выглядит это действительно влияет на все? - person magnoz; 05.09.2018
comment
Можете ли вы запустить свой дочерний процесс (ы) отдельно? Я сам не работал с исполнителем, я предпочитаю держать вещи отдельно и запускать каждый сервис самостоятельно. Если вы ищете альтернативы, которые, как я видел, успешно использовались, Celery и RedisQueue оба хороши, но я настаиваю на том, чтобы вы удостоверились, что эти рабочие процессы не используют eventlet, поскольку кажется, что вы используете функцию, которую eventlet не поддерживает. - person Miguel; 05.09.2018
comment
Тогда я проверю сельдерей, кстати, я только что попытался заменить ProcessPoolExecutor на ThreadPoolExecutor, и он не использует, и, согласно журналам, он генерирует событие, но клиент никогда его не получает INFO:socketio:emitting event "update_reports" to all [/test] - person magnoz; 05.09.2018
comment
Я попытался запустить отдельный скрипт, и он работает. Теперь я также вижу, что использование ThreadExecutorPool также работает (не с Process). Я мог бы подумать о переходе на Celery. Спасибо большое за вашу помощь! - person magnoz; 05.09.2018