Учитывая N генераторов, можно ли создать генератор, который запускает их в параллельных процессах и выдает zip этих генераторов?

Предположим, у меня есть N генераторов gen_1, ..., gen_N, каждый из которых даст одинаковое количество значений. Мне нужен генератор gen, который запускает gen_1,..., gen_N в N параллельных процессах и дает (next(gen_1), next(gen_2), ... next(gen_N))

То есть я хотел бы иметь:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

таким образом, что каждый gen_i работает в своем собственном процессе. Можно ли сделать это? Я попытался сделать это в следующем фиктивном примере без успеха:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

Однако я получаю сообщение об ошибке TypeError: cannot pickle 'generator' object.

РЕДАКТИРОВАТЬ:

Я немного изменил ответ @darkonaut, чтобы он соответствовал моим потребностям. Я публикую его на случай, если кому-то из вас он покажется полезным. Сначала определим пару полезных функций:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

Следующий класс отвечает за разбиение любого количества генераторов на n (количество процессов) пакетов и их обработку с получением желаемого результата:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

Чтобы использовать его, просто сделайте следующее:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

person creyesk    schedule 09.10.2020    source источник
comment
Если у вас уже есть объекты генератора, нет общего способа пересадить их в другой процесс. Вам нужно будет запустить каждый Process с целевой функцией, которая создаст там генератор.   -  person jasonharper    schedule 10.10.2020
comment
Даже если вам удастся это сделать, GIL, вероятно, предотвратит их параллельную работу.   -  person Mark Ransom    schedule 10.10.2020
comment
@MarkRansom Он использует multiprocessing, а не потоки, поэтому я не думаю, что GIL здесь применим.   -  person thegamecracks    schedule 10.10.2020
comment
@thegamecracks извините, я пропустил это; вы правы, что это удалит GIL из уравнения. Но это делает обмен данными более сложным.   -  person Mark Ransom    schedule 10.10.2020


Ответы (2)


Можно получить такой Unified Parallel Generator (UPG) (попытка придумать имя) с некоторыми усилиями, но, как уже упоминал @jasonharper, вам определенно нужно собрать подгенераторы внутри дочернего процесса. процессы, так как работающий генератор не может быть замаринован.

Приведенный ниже шаблон можно использовать повторно, только функция генератора gen() является пользовательской для этого примера. В проекте используется multiprocessing.SimpleQueue для возврата результатов генератора родительскому и multiprocessing.Barrier для синхронизации.

Вызов Barrier.wait() заблокирует вызывающую сторону (поток в любом процессе) до тех пор, пока указанное число parties не вызовет .wait(), после чего все потоки, ожидающие в данный момент Barrier, освобождаются одновременно. Использование Barrier здесь гарантирует, что дальнейшие результаты генератора начинают вычисляться только после родителя, получившего все результаты итерации, что может быть желательно для сохранения общего потребления памяти. в чеке.

Количество используемых параллельных рабочих процессов равно количеству аргументов-кортежей, которые вы предоставляете в gen_args_tuples-iterable, поэтому gen_args_tuples=zip(range(4)) будет использовать, например, четыре рабочих процесса. Дополнительные сведения см. в комментариях в коде.

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

Вывод:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0
person Darkonaut    schedule 10.10.2020
comment
большое спасибо. Это делает трюк! Я думаю, что это очень полезно для процессов с интенсивными вычислениями, когда вы не хотите начинать все сначала на каждой итерации. Библиотека, которая помогает Python использовать многопроцессорность более простым способом, была бы чрезвычайно полезной. - person creyesk; 10.10.2020
comment
@creysk Не за что. Да, IIRC, ты не первый, кто просит о чем-то подобном. - person Darkonaut; 11.10.2020

Я выбрал немного другой подход, вы можете соответствующим образом изменить приведенный ниже пример. Итак, где-то в основном скрипте инициализируйте пул в соответствии с вашими потребностями, вам нужны именно эти 2 строки

from multiprocessing import Pool

pool = Pool(processes=4)

то вы можете определить функцию генератора следующим образом: (Обратите внимание, что ввод генераторов предполагается любой итерацией, содержащей все генераторы)

def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
    batch = pool.map_async(next, generators)  # defines the next round of values
    results = list(batch.get)  # actual calculation done here
    yield results
return 

Мы определяем условие результатов в цикле while таким образом, потому что объекты сопоставления с next и генераторами возвращают пустой список, когда генераторы перестают создавать значения. Итак, в этот момент мы просто отключаем параллельный генератор.

РЕДАКТИРОВАТЬ

Таким образом, очевидно, многопроцессорный пул и карта не работают с генераторами, из-за чего приведенный выше код не работает должным образом, поэтому не используйте до более позднего обновления.

Что касается ошибки рассола, кажется, что некоторые связанные функции не поддерживают рассол, который необходим в библиотеке многопроцессорной обработки для передачи объектов и функций, для обходного пути пафосная библиотека многопроцессорной обработки использует укроп, который устраняет необходимость рассола и является вариантом, который вы могли бы хотите попробовать, ищите в Stack Overflow свою ошибку, вы также можете найти более сложные решения с пользовательским кодом для выбора необходимых функций.

person jimakr    schedule 10.10.2020
comment
Это хороший подход. Однако, если я попытаюсь выполнить команду yield.get(), тогда карта фактически запустится асинхронно, и я получу то же самое TypeError: cannot pickle 'generator' object. Я что-то упустил? - person creyesk; 10.10.2020