Как поддерживать глобальные процессы в пуле, работающие рекурсивно?

Я хочу реализовать рекурсивный параллельный алгоритм, и я хочу, чтобы пул создавался только один раз, и каждый временной шаг выполнял задание, ждал завершения всех заданий, а затем снова вызывал процессы с входами предыдущих выходов, а затем снова то же самое в следующий временной шаг и т. д.

Моя проблема в том, что я реализовал версию, в которой каждый раз я создаю и уничтожаю пул, но это очень медленно, даже медленнее, чем последовательная версия. Когда я пытаюсь реализовать версию, в которой пул создается только один раз в начале, я получаю ошибку утверждения, когда пытаюсь вызвать join().

это мой код

def log_result(result):

    tempx , tempb, u = result

    X[:,u,np.newaxis], b[:,u,np.newaxis] = tempx , tempb


workers =  mp.Pool(processes = 4) 
for t in range(p,T):

    count = 0 #==========This is only master's job=============
    for l in range(p):
        for k in range(4):
            gn[count]=train[t-l-1,k]
            count+=1
    G = G*v +  gn @ gn.T#==================================

    if __name__ == '__main__':
        for i in range(4):
            workers.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis], i, gn), callback = log_result)


        workers.join()   

X и b — это матрицы, которые я хочу обновить непосредственно в памяти мастера.

Что здесь не так, и я получаю ошибку утверждения?

Могу ли я реализовать с помощью пула то, что хочу, или нет?


person Bekromoularo    schedule 09.03.2018    source источник


Ответы (1)


Вы не можете присоединиться к пулу, который не был закрыт первым, так как join() будет ждать завершения рабочих процессов, а не выполнения заданий (https://docs.python.org/3.6/library/multiprocessing.html раздел 17.2.2.9).

Но поскольку это закроет пул, а это не то, что вам нужно, вы не можете использовать это. Таким образом, соединение отсутствует, и вам нужно реализовать «ожидание завершения всех заданий» самостоятельно.

Один из способов сделать это без циклов занятости — использовать очередь. Вы также можете работать с ограниченными семафорами, но они работают не во всех операционных системах.

counter = 0
lock_queue = multiprocessing.Queue()
counter_lock = multiprocessing.Lock()

def log_result(result):

    tempx , tempb, u = result

    X[:,u,np.newaxis], b[:,u,np.newaxis] = tempx , tempb
    with counter_lock:
        counter += 1
        if counter == 4:
            counter = 0
            lock_queue.put(42)



workers =  mp.Pool(processes = 4) 
for t in range(p,T):

    count = 0 #==========This is only master's job=============
    for l in range(p):
        for k in range(4):
            gn[count]=train[t-l-1,k]
            count+=1
    G = G*v +  gn @ gn.T#==================================

    if __name__ == '__main__':
        counter = 0
        for i in range(4):
            workers.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis], i, gn), callback = log_result)


        lock_queue.get(block=True)

Это сбрасывает глобальный счетчик перед отправкой заданий. Как только задание завершено, ваш обратный вызов увеличивает глобальный счетчик. Когда счетчик достигает 4 (ваше количество заданий), обратный вызов знает, что он обработал последний результат. Затем фиктивное сообщение отправляется в очередь. Ваша основная программа ожидает в Queue.get() появления чего-то там.

Это позволяет вашей основной программе блокироваться до завершения всех заданий, не закрывая пул.

Если вы замените multiprocessing.Pool на ProcessPoolExecutor из concurrent.futures, вы можете пропустить эту часть и использовать

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

для блокировки, пока все отправленные задачи не будут завершены. С функциональной точки зрения между ними нет никакой разницы. Метод concurrent.futures короче на пару строк, но результат точно такой же.

person Hannu    schedule 09.03.2018
comment
Вы знаете способ отладки кода? Потому что это все еще неправильно даже с вашим обновлением. Странно думать, что если я закрываю, присоединяюсь к каждому временному шагу, он работает правильно даже с обновлением. И все еще не работает должным образом для глобального пула. В чем может быть ошибка, подскажите? - person Bekromoularo; 13.03.2018
comment
Я бы просто добавил операторы печати. Я бы начал с печати счетчика в вашем log_results сразу после того, как он был увеличен. Когда вы говорите неправильно, что вы имеете в виду? Вы получаете исключение или он делает что-то, чего не должен делать? - person Hannu; 13.03.2018
comment
Просто результат не ожидаемый. Но когда я создаю новый пул каждый раз, это так. Я просто меняю место создания пула, не более того. Вот почему я говорю, что это странно - person Bekromoularo; 13.03.2018
comment
Одна вещь, которую вы могли бы попытаться сузить, — это установить maxtasksperchild=1 в свой пул. Это не то, что вы хотите получить, так как он будет выходить и воссоздавать рабочих после каждой задачи, но это может сузить проблему. Я также заметил, что вы изменяете переменные, которые выглядят глобальными в ваших потоках, но вы не используете никаких блокировок. Другая возможность состоит в том, что ваши операторы присваивания X создают локальную копию объекта, и вы продолжаете работать над этим. Просто догадываюсь, поскольку я не знаком с numpy и матричными операциями. - person Hannu; 13.03.2018
comment
Я не использую блокировку при изменении, потому что процесс обновляет только одну строку матрицы. Итак, 4 процесса обновляют 4 разные вещи. - person Bekromoularo; 13.03.2018
comment
Вы уверены, что это безопасно? Это может быть, но если Python/numpy делает что-то невидимое внутри (например, создает копию, вносит изменения в копию, записывает копию обратно в исходный сегмент памяти), это не будет потокобезопасным, даже если вы намеренно только изменяете подмножество массива. Но это всего лишь мысль. - person Hannu; 14.03.2018
comment
Вы можете попробовать добавить блокировки и посмотреть, решит ли это вашу проблему. Еще одна проверка, которую нужно добавить, — поместить print (id(X), id(b)) в ваш worker в качестве первой и последней строк, чтобы убедиться, что ваши глобальные переменные остаются прежними, и им не присваиваются локальные воплощения с тем же именем. Извините, я не могу больше помочь. - person Hannu; 14.03.2018
comment
Я попытался напечатать идентификатор, как вы мне сказали. И когда я использую глобальный пул (как я хочу), возникает проблема с идентификатором (b). Так что есть ошибка. Но использование замков ничего не меняет - person Bekromoularo; 14.03.2018
comment
Как я уже сказал, я не знаю, как там работает внутреннее устройство, но происходит так, что ваш b становится локальной копией. Я ничем больше не смогу помочь, но, по крайней мере, теперь вы знаете, с чем имеете дело. Вы можете попробовать что-то вроде этого: stackoverflow.com /questions/1540049/, где вы присваиваете значения b одно за другим в цикле и смотрите, поможет ли это. - person Hannu; 14.03.2018
comment
Также подумайте, какая разница в определениях X и b, если X работает, а b нет. Определены ли они в одной и той же части кода? Являются ли они объектами одного вида? И т.п. - person Hannu; 14.03.2018
comment
b и X определены в одной и той же части кода и представляют собой просто двумерные матрицы с плавающей запятой. При каждом обновлении b обновляется новым наблюдением, а X обновляется соответственно обновлению b. Когда программа закончена, b — правильное b, а X — что-то действительно неправильное. Дело в том, что локальная копия не влияет на b в конце, а влияет только на X. Это действительно странно. И другое, что еще более странно, это то, что если я не использую глобальный пул, ничего не пойдет не так. В любом случае спасибо за вашу ценную помощь. - person Bekromoularo; 14.03.2018
comment
Я тоже попробовал ProcessPoolExecutor, но похоже, что проблема та же. Я обновил свой код, чтобы не обновлять глобальную переменную. Но опять происходит то же самое. Есть ли у вас какие-либо идеи, почему он работает с пулом, который создается и закрывается на каждом временном шаге, а не с глобальным пулом, ожидающим поступления заданий? Первый путь все еще параллелен? Это очень расстраивает. - person Bekromoularo; 15.03.2018