Като се има предвид N генератора, възможно ли е да се създаде генератор, който ги изпълнява в паралелни процеси и дава zip на тези генератори?

Да предположим, че имам N генератора gen_1, ..., gen_N, където всеки от тях ще даде същия брой стойности. Бих искал генератор gen такъв, че да изпълнява gen_1, ..., gen_N в N паралелни процеса и да дава (next(gen_1), next(gen_2), ... next(gen_N))

Тоест бих искал да имам:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))

по такъв начин, че всеки gen_i да работи на свой собствен процес. Възможно ли е да се направи това? Опитах се да направя това в следния фиктивен пример без успех:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()

Въпреки това получавам грешката TypeError: cannot pickle 'generator' object.

РЕДАКТИРАНЕ:

Промених малко отговора на @darkonaut, за да отговаря на нуждите ми. Публикувам го, ако някой от вас го намери за полезен. Първо дефинираме няколко полезни функции:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]

Следният клас е отговорен за разделянето на произволен брой генератори на n (брой процеси) партиди и обработването им, което води до желания резултат:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)

За да го използвате, просто направете следното:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)

person creyesk    schedule 09.10.2020    source източник
comment
Ако вече имате генераторните обекти, няма общ начин да ги трансплантирате в друг процес. Ще трябва да започнете всеки Process с целева функция, която ще създаде генератора там.   -  person jasonharper    schedule 10.10.2020
comment
Дори и да успеете да направите това, GIL вероятно ще им попречи да работят паралелно.   -  person Mark Ransom    schedule 10.10.2020
comment
@MarkRansom Той използва multiprocessing, а не нишки, така че не мисля, че GIL се прилага тук.   -  person thegamecracks    schedule 10.10.2020
comment
@thegamecracks съжалявам, пропуснах това; прав си, че ще премахне GIL от уравнението. Но това прави обмена на данни по-сложен.   -  person Mark Ransom    schedule 10.10.2020


Отговори (2)


Възможно е да получите такъв Unified Parallel Generator (UPG) (опит за измисляне на име) с известни усилия, но както @jasonharper вече спомена, определено трябва да сглобите под-генераторите в дъщерния елемент- процеси, тъй като работещ генератор не може да бъде маринован.

Моделът по-долу може да се използва повторно, като само функцията за генериране gen() е персонализирана за този пример. Дизайнът използва multiprocessing.SimpleQueue за връщане на резултати от генератора към родителя и multiprocessing.Barrier за синхронизиране.

Извикването на Barrier.wait() ще блокира извикващия (нишката във всеки процес), докато броят на посочените parties не извика .wait(), след което всички нишки, чакащи в момента на Barrier, се освобождават едновременно. Използването на Barrier тук гарантира, че по-нататъшните резултати от генератора започват да се изчисляват едва след като родителят е получил всички резултати от итерация, което може да е желателно, за да се запази общата консумация на памет под контрол.

Броят на използваните паралелни работници се равнява на броя кортежи с аргументи, които предоставяте в рамките на gen_args_tuples-iterable, така че gen_args_tuples=zip(range(4)) ще използва например четири работници. Вижте коментарите в кода за повече подробности.

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)

Изход:

WORKER-1 sending item.
WORKER-0 sending item.
WORKER-3 sending item.
WORKER-2 sending item.
('a0', 'a1', 'a2', 'a3')
WORKER-1 sending item.
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-0 sending item.
('b0', 'b1', 'b2', 'b3')
WORKER-2 sending item.
WORKER-3 sending item.
WORKER-1 sending item.
WORKER-0 sending item.
('c0', 'c1', 'c2', 'c3')

Process finished with exit code 0
person Darkonaut    schedule 10.10.2020
comment
Благодаря ти много. Това върши работа! Мисля, че това е много полезно за интензивни изчислителни процеси, при които не искате да не започвате отначало при всяка итерация. Библиотека, която помага на python да използва мултипроцесор по по-прост начин, би била изключително полезна. - person creyesk; 10.10.2020
comment
@creyesk Няма за какво. Да, IIRC, ти не си първият, който иска нещо подобно. - person Darkonaut; 11.10.2020

Избрах малко по-различен подход, можете съответно да промените примера по-долу. Така че някъде в основния скрипт инициализирайте пула според вашите нужди, имате нужда само от тези 2 реда

from multiprocessing import Pool

pool = Pool(processes=4)

тогава можете да дефинирате генераторна функция като тази: (Имайте предвид, че входът на генераторите се предполага, че е всеки итерируем, съдържащ всички генератори)

def parallel_generators(generators, pool):
results = ['placeholder']
while len(results) != 0:
    batch = pool.map_async(next, generators)  # defines the next round of values
    results = list(batch.get)  # actual calculation done here
    yield results
return 

Ние дефинираме условието за резултати в цикъла while по този начин, защото картографираните обекти с next и генератори връщат празен списък, когато генераторите спрат да произвеждат стойности. Така че в този момент просто прекратяваме паралелния генератор.

РЕДАКТИРАНЕ

Така че очевидно многопроцесорният пул и картата не работят добре с генератори, карайки горния код да не работи по предназначение, така че не използвайте до по-късна актуализация.

Що се отнася до грешката pickle, изглежда, че някои обвързани функции не поддържат pickle, което е необходимо в мултипроцесорната библиотека, за да се прехвърлят обекти и функции, за заобиколно решение библиотеката pathos mutliprocessing използва dill, което решава нуждата от pickle и е опция, която бихте могли искате да опитате, търсейки грешката си в Stack Overflow, можете също да намерите някои по-сложни решения с персонализиран код за избиране на необходимите функции.

person jimakr    schedule 10.10.2020
comment
Това е хубав подход. Ако обаче се опитам да дам batch.get(), тогава той всъщност изпълнява картата async и получавам същото TypeError: cannot pickle 'generator' object. Пропускам ли нещо? - person creyesk; 10.10.2020