Программа с flask-socketio и multiprocessing thorws 'LoopExit: эта операция будет заблокирована навсегда'

во-первых: я абсолютный новичок в python, раньше я писал PHP, поэтому, если я получаю что-то совершенно неправильное, пожалуйста, дайте мне знать.

Я пишу приложение. Он должен предоставлять свою информацию через веб-сокеты. Для этого я выбрал flask-socketio. В фоновом режиме я хочу обрабатывать данные. Поскольку я хотел бы, чтобы приложение было небольшим, я отказался от такого решения, как Celery.

Я сократил код до:

# -*- coding: utf8 -*-

from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import time
import os

def background_stuff(args):
    while True:
        try:
            print args
            time.sleep(1)
        except Exception as e:
            return e

thread = None
_pool = None

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    _pool = Pool(1)
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        workers = _pool.apply_async(
            func=background_stuff,
            args=('do background stuff',),
        )
    socketio.run(app)
    # app.run()

При запуске я получаю следующие сообщения:

python test/multitest.py
 * Running on http://127.0.0.1:5000/
 * Restarting with stat
do background stuff
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 336, in _handle_tasks
    for taskseq, set_length in iter(taskqueue.get, None):
  File "/usr/lib/python2.7/Queue.py", line 168, in get
    self.not_empty.wait()
  File "/usr/lib/python2.7/threading.py", line 340, in wait
    waiter.acquire()
  File "gevent/_semaphore.pyx", line 112, in gevent._semaphore.Semaphore.acquire (gevent/gevent._semaphore.c:3386)
  File "/home/phil/work/ttimer/server/local/lib/python2.7/site-packages/gevent/hub.py", line 338, in switch
    return greenlet.switch(self)
LoopExit: This operation would block forever

do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860
do background stuff
do background stuff
do background stuff
do background stuff
^CProcess PoolWorker-1:
Process PoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 113, in worker
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
    result = (True, func(*args, **kwds))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
  File "test/multitest.py", line 14, in background_stuff
KeyboardInterrupt
    time.sleep(1)
KeyboardInterrupt
    return recv()
KeyboardInterrupt

Итак, фоновый процесс работает и отвечает на http-запросы (127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860). Но простое игнорирование ошибки, потому что она работает, не кажется мне решением. Кто-нибудь может сказать мне, что я делаю неправильно здесь?

Если вы скажете, что я не могу сделать это таким образом, можете ли вы сказать мне, почему? Я хотел бы узнать и понять, что я делаю неправильно.

Я читал что-то о монкепатчинге, но все, что предлагалось, выбрасывало больше или другие ошибки. Я думаю, что лучше работать над первой ошибкой, чем слепо пытаться ее исправить.

python -V
Python 2.7.9

Привет

обновить

Я добавил 2 строки для исправления обезьян, вот что я получил:

$python multitest2.py 
 ^CProcess PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
KeyboardInterrupt
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
    task = get()
KeyboardInterrupt

 * Running on http://127.0.0.1:5000/
 * Restarting with stat
^CProcess PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
    task = get()
KeyboardInterrupt

    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
KeyboardInterrupt
do background stuff
FAILED to start flash policy server: [Errno 98] Address already in use: ('127.0.0.1', 10843)
$do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff

При запуске вообще нет выхода. После нескольких нажатий ctrl-c я получаю вывод фонового материала. Это продолжается до тех пор, пока я не убью процесс python через SIGKILL.

обновление 2

то, что я ожидаю увидеть, это

 * Running on http://127.0.0.1:5000/
 * Restarting with stat
do background stuff
do background stuff
do background stuff

сразу после запуска скрипта. Но до того, как я нажму ctrl-c, ничего не происходит.


person Frank Nord    schedule 29.09.2015    source источник


Ответы (2)


Прежде всего, вам нужно знать, что версия Flask-SocketIO, которую вы используете, требует gevent, который является фреймворком сопрограммы. Использование асинхронных сопрограмм gevent с многопроцессорным пулом — странная комбинация. Вы используете gevent, поэтому наиболее целесообразно использовать функциональность gevent pool. чтобы все было последовательно.

Теперь, что касается проблемы, я думаю, что это, вероятно, связано с тем, что обезьяна из стандартной библиотеки не была исправлена ​​​​на ранней стадии. Я рекомендую вам добавить следующие строки в самый верх вашего скрипта (над вашими импортами сделайте их строками 1 и 2):

from gevent import monkey
monkey.patch_all()

Это гарантирует, что любые вызовы стандартной библиотеки для таких вещей, как потоки, семафоры и т. д., перейдут к реализациям gevent.

Обновление: я попробовал ваш пример. Оригинальная версия, без мартык-патча, у меня работает нормально, я не вижу ошибки LoopExit, о которой вы сообщили. Как вы сообщили, добавление исправления обезьяны предотвращает запуск фоновых процессов.

В любом случае, я преобразовал ваш скрипт для использования gevent.pool, и у меня он работает надежно. Вот отредактированный скрипт:

from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from gevent.pool import Pool
import time
import os

def background_stuff(args):
    while True:
        try:
            print args
            time.sleep(1)
        except Exception as e:
            return e

thread = None
_pool = None

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    _pool = Pool(1)
    workers = _pool.apply_async(
        func=background_stuff,
        args=('do background stuff',),
    )
    socketio.run(app)

Надеюсь это поможет!

person Miguel    schedule 30.09.2015
comment
› Использование асинхронных сопрограмм gevent с многопроцессорным пулом — странная комбинация. Думаю, это самое важное, что мне нужно знать. Я читал об исправлении обезьяны, и, вставляя эти две строки, я получаю другие странные ошибки. Я хотел бы узнать «правильный» способ, поэтому я прочитаю о gevent и попробую. Спасибо за эту ссылку! - person Frank Nord; 30.09.2015
comment
@FrankNord Не уверен, что есть правильные или неправильные пути. Я утверждаю, что это странная комбинация, потому что вы используете два совершенно разных способа достижения многозадачности. Хотя я бы не сказал, что это неправильно. Может быть, нам следует взглянуть на те ошибки, которые вы получаете, когда устанавливаете патчи для обезьян, не стесняйтесь добавлять это к другому вопросу. - person Miguel; 30.09.2015
comment
Я знаю, что нет единственного пути, кавычки должны показывать это. Но после вашего объяснения для таких новичков, как я, кажется не лучшим способом смешивать эти два. Может быть, есть случай, который нуждается в этом решении, но, я думаю, необходимы знания о стенде. Как вы предложили, я попытался добавить эти две строки и добавил результат к своему первоначальному вопросу. Если хотите, вы можете подумать об этом и рассказать мне, что происходит, я готов учиться. Но для моего маленького проекта я буду придерживаться более простого решения, я хочу иметь возможность прогрессировать с ним. - person Frank Nord; 01.10.2015
comment
@FrankNord добавленные вами трассировки стека являются результатом того, что вы нажали Ctrl-C, я думаю. Я не вижу никакого другого выхода, что вы ожидаете увидеть? - person Miguel; 01.10.2015
comment
Я добавил свои ожидания к моему вопросу - person Frank Nord; 01.10.2015
comment
Извините, я прочитал учебник, чтобы узнать о gevent и найти решение, которое я разместил ниже. Я совершенно не видел, чтобы вы обновили свой ответ, поэтому мой ответ звучит не очень благодарно. Но большое спасибо! Вы мне очень помогли, и теперь я кое-что знаю о gevent. - person Frank Nord; 02.10.2015

Я прочитал учебник о gevent и нашел решение, которое является простым и понятным для моих нужд:

# -*- coding: utf8 -*-

from flask import Flask
from flask.ext.socketio import SocketIO
import gevent
import os

def background_stuff():
    while True:
        try:
            print 'doing background work ... '
            gevent.sleep(1)
        except Exception as e:
            return e

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        gevent.spawn(background_stuff)
    socketio.run(app)

Учебник можно найти здесь: http://sdiehl.github.io/gevent-tutorial/#long-polling

Там даже рассказывается о проблемах с gevent и многопроцессорностью: http://sdiehl.github.io/gevent-tutorial/#subprocess , но поскольку я нашел простое решение, соответствующее моим потребностям, я не пробовал ничего другого.

person Frank Nord    schedule 01.10.2015