Как быстро выйти из строя с многопроцессорностью?

Следующий код — моя первая попытка быстро потерпеть неудачу, когда в функции гиперпараметра выдается Exception.

К сожалению, сначала обрабатываются все данные, прежде чем вызывающая сторона получит исключение.

Что я могу сделать, чтобы весь процесс немедленно прекращался, если в вызываемой функции возникает ошибка (чтобы я мог быстрее исправить свою ошибку кодирования и т. д. и не ждать, пока все различные комбинации параметров будут обработаны/оптимизированы)?

Код:

from sklearn.model_selection import ParameterGrid
from multiprocessing import Pool
from enum import Enum

var1 = 'var1'
var2 = 'var2'
abc = [1, 2]
xyz = list(range(1_00_000))
pg = [{'variant': [var1],
       'abc': abc,
       'xyz': xyz, },
      {'variant': [var2],
       'abc': abc, }]
parameterGrid = ParameterGrid(pg)
myTemp = list(parameterGrid)

print('len(parameterGrid):', len(parameterGrid))


def myFunc(myParam):
    if myParam['abc'] == 1:
        raise ValueError('error thrown')
    print(myParam)


pool = Pool(1)
myList = pool.map(myFunc, parameterGrid)

Что приводит к:

len(parameterGrid): 200002
{'abc': 2, 'variant': 'var1', 'xyz': 2}
{'abc': 2, 'variant': 'var1', 'xyz': 3}
{'abc': 2, 'variant': 'var1', 'xyz': 4}
{'abc': 2, 'variant': 'var1', 'xyz': 5}
{'abc': 2, 'variant': 'var1', 'xyz': 6}
.
.
.
{'abc': 2, 'variant': 'var1', 'xyz': 99992}
{'abc': 2, 'variant': 'var1', 'xyz': 99993}
{'abc': 2, 'variant': 'var1', 'xyz': 99994}
{'abc': 2, 'variant': 'var1', 'xyz': 99995}
{'abc': 2, 'variant': 'var1', 'xyz': 99996}
{'abc': 2, 'variant': 'var1', 'xyz': 99997}
{'abc': 2, 'variant': 'var1', 'xyz': 99998}
{'abc': 2, 'variant': 'var1', 'xyz': 99999}
ValueError: error thrown

person user7468395    schedule 23.07.2019    source источник
comment
Здесь вам понадобится pool.apply_async() с обратным вызовом ошибки: Многопроцессорность Python: прервать сопоставление при первой дочерней ошибке   -  person Darkonaut    schedule 23.07.2019


Ответы (2)


Как я вижу, обрабатываются не все данные. Только для случая 'abc' = 2 это проходит. Как только myFunc получает параметр с 'abc' = 2, он генерирует исключение. Выглядит правильно, не так ли? Вы можете проверить все свои параметры в сетке перед запуском карты. Он оставляет только значения, которые действительны/подходят для вас

myTemp_2 = filter(lambda x: x['abc'] != 1, myTemp)

Оставляет только подходящие вам значения

person GolovDanil    schedule 23.07.2019

Чтобы аварийно завершить весь Pool процессов (надеюсь, вам нужно такое условие для целей тестирования):

...
def myFunc(myParam):
    if myParam['abc'] == 1:
        print('error occurred')
        pool.terminate()    # accessed globally
    print(myParam)

if __name__ == '__main__':
    pool = Pool(1)
    myList = pool.map(myFunc, parameterGrid)

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.terminate

person RomanPerekhrest    schedule 23.07.2019
comment
Возможно, я что-то упускаю, но # доступен глобально: как? При использовании метода запуска spawn в рабочем процессе не создается пул. С fork и forkserver есть не полностью инициализированный пул, которому не присвоено имя pool (или любое другое имя) в воркерах. - person shmee; 23.07.2019
comment
@shmee, прочитайте о том, что происходит с дочерними процессами при условии if __name__ == '__main__': В Unix, использующем метод запуска fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Но это больше для Unix (на самом деле я не рассматриваю Windows) - person RomanPerekhrest; 23.07.2019
comment
@shmee, вот похожая тема stackoverflow.com/a/36962624/3185459 - person RomanPerekhrest; 23.07.2019
comment
Я не уверен, что ваш пример применим. Рабочие разветвляются во время инициализации пула. Присвоение объекту пула имени pool происходит после завершения метода __init__ пула, когда рабочие процессы уже активны. Объекты Event в первом примере кода ответа, на который вы ссылаетесь, доступны в рабочих процессах, поскольку они были созданы до создания экземпляра пула. Если вы переместите их экземпляр после экземпляра пула, использование их в функции вызовет ошибку NameError, точно так же, как здесь используется pool. - person shmee; 23.07.2019
comment
Во втором примере кода этого ответа функция, вызывающая terminate в пуле, передается как обратный вызов. Это выполняется в основном процессе, поэтому в этом случае пул полностью инициализируется и назначается соответствующему имени. - person shmee; 23.07.2019