почему этот вызываемый объект не оценивается для каждого запуска внутри пафосного многопроцессорного пула?

Я пытаюсь распараллелить некоторый код, который использует частичные функции для генерации случайных чисел для моделирования, над которым я работаю. Со следующим кодом:

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example(func1, func2):
    sleep(1)
    [a, b] = [func1(), func2()]
    return (f"arg #1 is {round(a,2)}, arg #2 is {round(b,2)} at {datetime.now().time()}")

rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...

print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(example, argsToRun)
print("\n".join(results))

Я получаю следующий вывод...

running with a for loop...
arg #1 is 134.5, arg #2 is 232.45 at 11:58:17.025493
arg #1 is 213.38, arg #2 is 306.7 at 11:58:18.027038
arg #1 is 107.3, arg #2 is 347.19 at 11:58:19.028476

Running with itertools.starmap...
arg #1 is 167.7, arg #2 is 247.96 at 11:58:20.030238
arg #1 is 235.97, arg #2 is 318.02 at 11:58:21.031543
arg #1 is 140.41, arg #2 is 387.51 at 11:58:22.032727

Running with pathos.mp.starmap...
arg #1 is 120.24, arg #2 is 208.23 at 11:58:23.100251  
arg #1 is 220.24, arg #2 is 308.23 at 11:58:23.112206  
arg #1 is 120.24, arg #2 is 308.23 at 11:58:23.126050  

Проблема в том, что когда я распараллеливаю это, случайные функции НЕ оцениваются каждый раз по-разному. Они оцениваются только один раз (или результат каким-то образом повторно используется снова и снова...), если вы посмотрите на последний блок, значения из переданных случайных функций не меняются. Я поставил туда временные метки, чтобы убедить себя, что последний блок действительно выполнялся параллельно.

Я уверен, что это как-то связано с тем, когда и как оцениваются параметры кортежа для вызовов функций, но в этот момент я потерялся.

Цель очень высокого уровня состоит в том, чтобы иметь возможность создать (очень большой) список параметров для передачи в среду simPy, и чтобы пул выполнялся на них параллельно. Но пока я не разобрался, как заставить работать случайность, я застрял, делая это на 1/32 скорости, которая мне нужна.


person ljwobker    schedule 28.11.2019    source источник
comment
Генерация случайных чисел одним генератором в нескольких процессах одновременно, вероятно, не очень хорошая идея. Может быть, производитель предварительно заполняет очередь случайных значений, из которых потребляются процессы?   -  person Carcigenicate    schedule 28.11.2019
comment
Дело в том, чтобы иметь разные случайные распределения для разных прогонов. Я не понимаю, почему это было бы плохой идеей - мне нужна разница в числах, а не одни и те же числа каждый раз.   -  person ljwobker    schedule 28.11.2019
comment
Поскольку, как правило, если и объект не предназначен для многопроцессорной/поточной безопасности, манипулирование объектом из нескольких процессов/потоков одновременно может испортить внутреннее состояние объекта и вызвать странное поведение; как будто он дает один и тот же результат дважды.   -  person Carcigenicate    schedule 28.11.2019
comment
См. здесь, как это можно решить. Попробуйте создать несколько генераторов вместо использования глобального для всего.   -  person Carcigenicate    schedule 28.11.2019
comment
Извините, я не понимаю, как/почему этот механизм будет манипулировать объектом из нескольких процессов одновременно. Функция передается в конечное место, где она вызывается, а затем должна оцениваться там независимо, верно? Я уверен, что есть что-то, что мне не хватает, но я пока этого не вижу. (Извиняюсь!)   -  person ljwobker    schedule 29.11.2019
comment
Каждый раз, когда func1 и func2 вызываются внутри example, глобальный генератор случайных чисел создает значение, которое изменяет внутреннее состояние генератора. example передается starmap, а starmap потенциально может вызывать example несколько раз одновременно. Если example вызывается несколько раз одновременно, у вас потенциально может быть несколько одновременных вызовов func1 и func2, что означает, что генератор случайных чисел управляется несколькими процессами одновременно.   -  person Carcigenicate    schedule 29.11.2019
comment
И просто чтобы уточнить, в моем комментарии выше под starmap я имею в виду pool.starmap, а не itertools' starmap. Версия itertools не работает параллельно, так что все в порядке.   -  person Carcigenicate    schedule 29.11.2019
comment
ОК, я нашел stackoverflow.com/questions/10021882/, который выглядит полезным, но мне непонятно, где мне нужно предоставлять отдельные экземпляры ГСЧ. Должен ли я фактически передавать экземпляр local_random для каждого вызова функции? (Думаю, нет...)   -  person ljwobker    schedule 29.11.2019
comment
Возможно, будет проще предварительно сгенерировать случайные числа, сохранить их в многопроцессорной безопасной очереди (пафос должен иметь ее) и по мере необходимости получать example числа из очереди.   -  person Carcigenicate    schedule 29.11.2019
comment
Или, может быть, спросили по-другому: как правильно добавить потокобезопасную версию random в каждый экземпляр примера, вызываемый звездной картой? Возможно ли это без изменения подписи примера ()? И предварительная генерация имеет смысл, но в моем случае мне нужно использовать кучу разных дистрибутивов random.XXX в реальной программе.   -  person ljwobker    schedule 29.11.2019


Ответы (1)


Итак, у меня это работает, но я почти уверен, что это все еще полный взлом. В итоге я взял только параметры функций и передал их методу, который использует локальный экземпляр случайного...

#!/usr/bin/env python3 

import functools
import random
import pathos
from itertools import starmap
from time import sleep
from datetime import datetime

def example( func1, func2):
    sleep(0.5)
    [a, b] = [func1(), func2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")

def threadSafe(func1, func2):
    sleep(0.5)
    localRandom = random.Random()
    meth1 = getattr(localRandom, func1.func.__name__, func1.args)
    meth2 = getattr(localRandom, func2.func.__name__, func2.args)
    local_f1 = functools.partial(meth1, *func1.args)
    local_f2 = functools.partial(meth2, *func2.args)
    [a, b] = [local_f1(), local_f2()]
    return (f"values {round(a,2)},  {round(b,2)}   at {datetime.now().time()}")


rand1 = functools.partial(random.uniform, 100, 199)
rand2 = functools.partial(random.uniform, 200, 299)
rand3 = functools.partial(random.uniform, 300, 399)

argsToRun = [(rand1, rand2), (rand2, rand3), (rand1, rand3)]    # 3 ordered combinations...


print(f"running with a for loop...")
for args in argsToRun:
    result = example(*args)
    print(result)


print("\nRunning with itertools.starmap...")
results = starmap(example, argsToRun)
print("\n".join(results))


print("\nRunning threadSafe with pathos.mp.starmap...")    
with pathos.helpers.mp.Pool() as pool:
    results = pool.starmap(threadSafe, argsToRun)
print("\n".join(results))
person ljwobker    schedule 04.12.2019