Програма с flask-socketio и многопроцесорни thorws „LoopExit: Тази операция ще блокира завинаги“

първо: аз съм абсолютно начинаещ в Python, писах PHP преди, така че ако получавам нещо напълно грешно, моля, уведомете ме.

Пиша приложение. Той трябва да обслужва информацията си чрез уеб сокети. Избрах flask-socketio за това. Във фонов режим искам да обработя данните. Тъй като бих искал приложението да е малко, се отказах от решение като Celery.

Съкратих кода до:

# -*- coding: utf8 -*-

from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import time
import os

def background_stuff(args):
    while True:
        try:
            print args
            time.sleep(1)
        except Exception as e:
            return e

thread = None
_pool = None

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    _pool = Pool(1)
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        workers = _pool.apply_async(
            func=background_stuff,
            args=('do background stuff',),
        )
    socketio.run(app)
    # app.run()

Когато стартирам това, получавам следните съобщения:

python test/multitest.py
 * Running on http://127.0.0.1:5000/
 * Restarting with stat
do background stuff
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 336, in _handle_tasks
    for taskseq, set_length in iter(taskqueue.get, None):
  File "/usr/lib/python2.7/Queue.py", line 168, in get
    self.not_empty.wait()
  File "/usr/lib/python2.7/threading.py", line 340, in wait
    waiter.acquire()
  File "gevent/_semaphore.pyx", line 112, in gevent._semaphore.Semaphore.acquire (gevent/gevent._semaphore.c:3386)
  File "/home/phil/work/ttimer/server/local/lib/python2.7/site-packages/gevent/hub.py", line 338, in switch
    return greenlet.switch(self)
LoopExit: This operation would block forever

do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860
do background stuff
do background stuff
do background stuff
do background stuff
^CProcess PoolWorker-1:
Process PoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 113, in worker
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
    result = (True, func(*args, **kwds))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
  File "test/multitest.py", line 14, in background_stuff
KeyboardInterrupt
    time.sleep(1)
KeyboardInterrupt
    return recv()
KeyboardInterrupt

Така фоновият процес работи и отговаря на http заявки (127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860). Но просто игнорирането на грешка, защото изглежда, че работи, не изглежда решение за мен. Някой може ли да ми каже какво правя грешно тук?

Ако кажеш, че не мога да го направя по този начин, можеш ли да ми кажеш защо? Бих искал да науча и да разбера какво правя грешно.

Четох нещо за monkepatching, но всичко предложено хвърли само повече или други грешки. Мисля, че е по-добре да работим върху първата грешка, вместо сляпо да опитваме поправки.

python -V
Python 2.7.9

Поздравления

актуализация

Добавих 2 реда за monkeypatching, ето какво получих:

$python multitest2.py 
 ^CProcess PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
KeyboardInterrupt
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
    task = get()
KeyboardInterrupt

 * Running on http://127.0.0.1:5000/
 * Restarting with stat
^CProcess PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
    task = get()
KeyboardInterrupt

    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
KeyboardInterrupt
do background stuff
FAILED to start flash policy server: [Errno 98] Address already in use: ('127.0.0.1', 10843)
$do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff

При стартиране изобщо няма изход. След натискане на ctrl-c няколко пъти, получавам изхода за фонови неща. Това продължава, докато не убия процеса на python чрез SIGKILL

актуализация 2

това, което очаквам да видя е

 * Running on http://127.0.0.1:5000/
 * Restarting with stat
do background stuff
do background stuff
do background stuff

веднага след изпълнението на скрипта. Но преди да натисна ctrl-c нищо не се случва.


person Frank Nord    schedule 29.09.2015    source източник


Отговори (2)


На първо място, трябва да сте наясно, че версията на Flask-SocketIO, която използвате, изисква gevent, което е сърутинна рамка. Използването на асинхронните съпрограми на gevent с многопроцесорен пул е странна комбинация. Вие използвате gevent, така че това, което би имало най-голям смисъл, е да използвате функционалността pool на gevent така че всичко да е последователно.

Що се отнася до проблема, мисля, че вероятно се дължи на липсата на корекция на стандартната библиотечна маймуна на ранен етап. Препоръчвам ви да добавите следните редове в най-горната част на вашия скрипт (над вашите импортирания, направете ги редове 1 и 2):

from gevent import monkey
monkey.patch_all()

Те ще гарантират, че всички извиквания към стандартната библиотека за неща като нишки, семафори и т.н. отиват към реализациите на gevent.

Актуализация: Опитах вашия пример. Оригиналната версия, без monkey-patch, работи добре за мен, не виждам грешката LoopExit, която сте докладвали. Добавянето на monkey patching предотвратява стартирането на фоновите процеси, както съобщихте.

Във всеки случай преобразувах вашия скрипт да използва gevent.pool и това работи надеждно за мен. Ето редактирания скрипт:

from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from gevent.pool import Pool
import time
import os

def background_stuff(args):
    while True:
        try:
            print args
            time.sleep(1)
        except Exception as e:
            return e

thread = None
_pool = None

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    _pool = Pool(1)
    workers = _pool.apply_async(
        func=background_stuff,
        args=('do background stuff',),
    )
    socketio.run(app)

Надявам се това да помогне!

person Miguel    schedule 30.09.2015
comment
› Използването на асинхронните съпрограмми на gevent с многопроцесорна група е странна комбинация. Мисля, че това е най-важното нещо, което трябва да знам. Четох за корекцията на маймуните и като вмъкна тези два реда, получавам други странни грешки. Бих искал да науча „правилния“ начин, така че ще прочета за gevent и ще опитам. Благодаря ви за този линк! - person Frank Nord; 30.09.2015
comment
@FrankNord Не съм сигурен, че има правилни или грешни начини. Моето твърдение, че това е странна комбинация, е защото използвате два напълно различни начина за постигане на многозадачност. Не бих казал обаче, че е грешно. Може би трябва да разгледаме тези грешки, които получавате, когато правите маймунски корекции, не се колебайте да добавите това към друг въпрос. - person Miguel; 30.09.2015
comment
Знам, че няма единствения начин, кавичките трябва да показват това. Но след вашето обяснение изглежда не е най-добрият начин за начинаещи като мен да смесват тези двете. Може би има случай, който се нуждае от това решение, но според мен са необходими знания за кабината. Както предложихте, опитах да добавя тези два реда и добавих резултата към първоначалния си въпрос. Ако искате, можете да помислите за това и да ми кажете какво се случва, аз съм готов да се науча. Но за моя малък проект ще се придържам към по-просто решение, искам да мога да напредвам с него. - person Frank Nord; 01.10.2015
comment
@FrankNord следите на стека, които сте добавили, са резултат от натискането на Ctrl-C според мен. Не виждам друг резултат, какво очаквате да видите? - person Miguel; 01.10.2015
comment
Добавих очакванията си към въпроса си - person Frank Nord; 01.10.2015
comment
Съжалявам, прочетох урока, за да науча за gevent и да намеря решение, което публикувах по-долу. Изобщо не те видях да актуализираш отговора си, така че отговорът ми не звучи много благодарен. Но много ви благодаря! Ти ми помогна много и вече знам нещо за Gevent. - person Frank Nord; 02.10.2015

Прочетох урок за gevent и намерих решение, което е просто и чисто за моите нужди:

# -*- coding: utf8 -*-

from flask import Flask
from flask.ext.socketio import SocketIO
import gevent
import os

def background_stuff():
    while True:
        try:
            print 'doing background work ... '
            gevent.sleep(1)
        except Exception as e:
            return e

app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)

@app.route('/', methods=['GET'])
def get_timers():
    return 'timer'

if __name__=='__main__':
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        gevent.spawn(background_stuff)
    socketio.run(app)

Урокът може да бъде намерен тук: http://sdiehl.github.io/gevent-tutorial/#long-polling

Той дори разказва за проблеми с gevent и мултипроцесиране: http://sdiehl.github.io/gevent-tutorial/#subprocess, но тъй като намерих просто решение, което отговаря на нуждите ми, не опитах нищо друго.

person Frank Nord    schedule 01.10.2015