dask.delayed KeyError с распределенным планировщиком

У меня есть функция interpolate_to_particles, написанная на c и обернутая ctypes. Я хочу использовать dask.delayed для выполнения серии вызовов этой функции.

Код успешно работает без dask

# Interpolate w/o dask
result = interpolate_to_particles(arg1, arg2, arg3)

и с распределенным расписанием в режиме single-threaded

# Interpolate w/ dask
from dask.distributed import Client
client = Client()
result = dask.delayed(interpolate_to_particles)(arg1, arg2, arg3)
result_c = result.compute(scheduler='single-threaded')

но если я вместо этого позвоню

result_c = result.compute()

Я получаю следующее KeyError:

> Traceback (most recent call last):   File
> "/path/to/lib/python3.6/site-packages/distributed/worker.py",
> line 3287, in dumps_function
>     result = cache_dumps[func]   File "/path/to/lib/python3.6/site-packages/distributed/utils.py",
> line 1518, in __getitem__
>     value = super().__getitem__(key)   File "/path/to/lib/python3.6/collections/__init__.py",
> line 991, in __getitem__
>     raise KeyError(key) KeyError: <function interpolate_to_particles at 0x1228ce510>

Журналы рабочих процессов, доступ к которым осуществляется с панели инструментов dask, не предоставляют никакой информации. Собственно, я не вижу информации, чтобы рабочие что-то сделали, кроме запуска.

Любые идеи о том, что может происходить, или предлагаемые инструменты, которые я могу использовать для дальнейшей отладки? Спасибо!


person elltrain    schedule 06.04.2020    source источник
comment
У вас есть минимальный пример? stackoverflow.com/help/minimal-reproducible-example. Вам нужно создать клиента, или ошибка возникает только с однопоточным планировщиком?   -  person TomAugspurger    schedule 07.04.2020
comment
@TomAugspurger Спасибо за комментарий. Интересно, что построенные мной минимальные примеры (на основе сообщение 1 и сообщение 2) не страдают той же проблемой. Ошибка возникает, только если я создаю клиента. То есть вызов result = dask.delayed(interpolate_to_particles)(...), за которым следует result_c = result.compute(), работает по назначению.   -  person elltrain    schedule 07.04.2020


Ответы (1)


Учитывая ваши комментарии, похоже, что ваша функция плохо сериализуется. Чтобы проверить это, вы можете попробовать обработать функцию в одном процессе и попытаться распаковать ее в другом.

>>> import pickle
>>> print(pickle.dumps(interpolate_to_particles))
b'some bytes printed out here'

А потом в другом процессе

>>> import pickle
>>> interpolate_to_particles = pickle.loads(b'the same bytes you had before')

Если это не сработает, вы поймете, что это ваша проблема. Я бы посоветовал вам поискать «как убедиться, что функции ctypes сериализуемы» или что-то подобное, или задать другой вопрос с этой меньшей областью здесь, в Stack Overflow.

person MRocklin    schedule 11.04.2020
comment
Я подозреваю, что проблема в этом. Под указанной выше ошибкой KeyError находится сообщение During handling of the above exception, another exception occurred: ‹long traceback› ValueError: ctypes objects containing pointers cannot be pickled. Недавно я прочитал ваш пост и теперь понимаю что сериализация важна для параллелизма. Теперь я также понимаю, что сериализация объектов с помощью указателей проблематична. - person elltrain; 14.04.2020
comment
У меня такая же проблема, и травление работает нормально, что неудивительно, поскольку все мои аргументы - это целые числа, строки или слова. Ошибка возникла из ниоткуда, все работало нормально, а теперь нет. - person Nimitz14; 27.04.2020
comment
(немного другая настройка, используются SGECluster и Client::submit - person Nimitz14; 27.04.2020