Многопроцесорната обработка на Python3 в Windows срива ZeroMQ код, работещ на Linux

Кодирам сървър, базиран на ZeroMQ, който трябва да инстанцира някои работници (дефинирани от техните собствени скриптове), с които комуникирам чрез multiprocessing.Queue-s.

По принцип имам:
- един основен клас, управляващ цялата комуникация с другите среди, в които има много неща, включително по-специално:
- списък с работници, всеки от тях получават някои инструкции чрез опашка.

Забавната част е: Трябва да имам няколко процеса, които могат да комуникират с тези работници паралелно. (да кажем, за да приложим функция за "безопасно спиране" например)

Всичко работи перфектно на Linux, но получавам много проблеми на Windows. По-специално получавам тази грешка, която изглежда идва от multiprocessing.spawn.py:

no default __reduce__ due to non-trivial __cinit__

Възпроизвеждах грешката с този минимален работен код

from multiprocessing import Process
import numpy as np
import zmq
import time

class myClass():
    def __init__(self):
        self.context = zmq.Context()
        #many zmq stuff
    def foo(self, bar):
        print( bar )
    def run(self):
        while True :
            time.sleep(1)
            a = np.array([1,2,3])
            dico = {"a":a}
            Process(target=self.foo, args=(dico,)).start()

if __name__ == "__main__":
    b = myClass()
    b.run()

Потърсих го и някак си изглежда, че трябва да предефинирам контекста всеки път, когато извикам "run", което не може да бъде направено, защото трябва да изпратя много данни през тези опашки с висока скорост.

Ако някой има представа какво да правя...


person jpaparone    schedule 13.11.2019    source източник


Отговори (1)


Ако някой има представа какво да прави...

Първо, добре дошли в домейна на Zen-of-Zero

В случай, че човек никога не е работил с ZeroMQ,
тук може да се наслади първо да погледне "ZeroMQ Принципи за по-малко от Пет секунди"
преди да се потопите в допълнителни подробности


Ако проблемът се появи само в Windows (не в O/S-es от клас linux), човек би се изкушил да предложи най-простата стъпка, а не да...

И все пак ще устоя на такова изкушение и ще започна със стъпки за изолиране на първопричината:

Windows-клас O/S-es използват друга форма на инстанциране на процеси в multiprocessing

Най-добрата следваща стъпка: оценете цялата си екосистема за изпълнение на код:

може да използва поставения по-долу шаблон за тестване, за да добави допълнително към вашите експерименти, свързани с класа.

Частта за ZeroMQ ме притеснява. Като се има предвид, че Context()-инстанцията се създава по време на .__init__(), multiprocessing-spawned-Process() трябва да извърши топ- надолу пълно копие на извикващия-процес ( да, целият процес на интерпретатор на python, копиране на всички променливи, пълни данни за състоянието на ресурсите на интерпретатора, ... задължителен spawn-метод на windows ( не всяка не-win налична ефективност от fork, forserver налична в win ) просто копира всичко - включен Context()-instance... )

Ако Process-инстанциите останат полупостоянни (и могат да останат такива за потенциално бъдещи повторни употреби) през целия живот на __main__, Context()-инстанцията остава в състоянието на предварително породеното .__init__()-извикване и се опитва да командва "споделен"-Context()-thread реплики (скрити във всяко от __main__ копията на процеса). Засега няма сблъсък, но #many zmq stuff може да причини проблема, тъй като ZeroMQ .Socket()-инстанциите не са безопасни за нишки (както често се предупреждава в документацията на API) но са също така пълноценни обекти, така че „репликираните“ пълни копия могат лесно да превърнат нещата в хаос

Задача: опитайте се да документирате и изолирате проблема - развитие -

най-добре чрез POSACK-отчитане (почти)-всеки изпълнен ред, стигайки до последния изпълнен и докладван ред преди срива ( представеното по-горе следсмъртно проследяване е твърде двусмислено, за да се вземе решение. __cinit__ може да се отнася до милиони места , където всъщност се провали)

този шаблон може да помогне за това:

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # 

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

може да изглежда нещо подобно на O/S от клас linux:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
person user3666197    schedule 13.11.2019
comment
Благодаря за вашият отговор ! Той се срива точно при стартиране на Process(...).start(). Функцията дори не се извиква, единственият факт на стартиране на процеса причинява срива. Може би не използвам правилния инструмент. По принцип трябва да имам много процеси, способни да поставят скриптове в опашката на много процеси. Може би трябва да започна всичко отначало с друга библиотека? От сега нататък ZeroMQ се използва само на по-високо ниво. - person jpaparone; 15.11.2019
comment
@jpaparone Първоначалното ми впечатление беше, че ZeroMQ не е основната причина за проблема. Фазата SER/DES-Pickle е моят заподозрян, за да потърся по-нататък - може да прочетете повече информация за проблемите на Windows с метода spawn() на подпроцесите за инстанциране тук: stackoverflow.com/a/58869967/3666197 - person user3666197; 15.11.2019