Как мога да паралелизирам конвейер от генератори/итератори в Python?

Да предположим, че имам код на Python като следния:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Този код чете всеки ред от входния файл, преминава през няколко функции и записва изхода в изходния файл. Сега Аз знам, че функциите process_line, process_item и generate_output_line никога няма да си пречат една на друга и нека приемем, че входните и изходните файлове са на отделни дискове, така че четенето и записът да не пречат на всеки друго.

Но Python вероятно не знае нищо от това. Разбирам, че Python ще прочете един ред, ще приложи всяка функция на свой ред и ще запише резултата в изхода, а след това ще прочете втория ред само след като изпрати първия ред в изхода, така че че втората линия не влиза в тръбопровода, докато първата не излезе. Разбирам ли правилно как ще протича тази програма? Ако това е начинът, по който работи, има ли някакъв лесен начин да се направи така, че няколко реда да могат да бъдат в конвейера едновременно, така че програмата да чете, пише и обработва всяка стъпка паралелно?


person Ryan C. Thompson    schedule 16.04.2011    source източник
comment
Мисля, че трябва да добавите многопоточност за това, така че бих казал не - нямате многопоточност в CPython и в други реализации, вероятно не си струва да имате сто реда вместо пет.   -  person    schedule 16.04.2011


Отговори (2)


Не можете наистина да паралелизирате четенето от или записването във файлове; те ще бъдат вашето тясно място в крайна сметка. Сигурни ли сте, че вашето тясно място тук е процесорът, а не I/O?

Тъй като вашата обработка не съдържа зависимости (според вас), е тривиално лесно да се използва Многопроцесорна обработка на Python. Пул клас.

Има няколко начина да напишете това, но по-лесният w.r.t. отстраняването на грешки е да се намерят независими критични пътища (най-бавната част от кода), които ще накараме да работят паралелно. Да приемем, че е process_item.

…И това е всъщност. Код:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Не съм го тествал, но това е основната идея. Imap методът на Pool гарантира, че резултатите се връщат в правилния ред.

person Samat Jain    schedule 16.04.2011
comment
Изпълнявам това на сървър с наистина бърз I/O. И мен ме изненада, че I/O не беше тясното място. Освен това мога да чета и пиша едновременно с малко забавяне на двете. Има ли причина да не мога или не трябва да използвам p.imap и на трите стъпки на обработка? Имам 48 ядра, с които да си играя, така че ако успея да паралелизирам добре, може би ще успея отново да направя I/O бавната стъпка. - person Ryan C. Thompson; 16.04.2011
comment
48 ядра са... много. Толкова много, че разходите за заключване/семафор от мултипроцесорния модул на Python стават значителни. Може би вместо това трябва да търсите Hadoop? - person Samat Jain; 19.04.2011
comment
За да отговоря на въпроса ви, зависи от вашия код. Предполагайки от това, което сте дали, вероятно бих комбинирал всичките 3 функции в една (която взема ред и връща изходния ред) и бих го предал на imap. - person Samat Jain; 19.04.2011
comment
Ще има ли по-малко режийни разходи, ако не се грижа за запазването на реда на редовете? - person Ryan C. Thompson; 19.04.2011

има ли някакъв лесен начин да направите така, че няколко реда да могат да бъдат в тръбопровода наведнъж

Написах библиотека, която да прави точно това: https://github.com/michalc/threaded-buffered-pipeline, който итерира всеки итерируем в отделна нишка.

И така, какво беше

input = open("input.txt")

x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)

output = open("output.txt", "w")
output.writelines(z)

става

from threaded_buffered_pipeline import buffered_pipeline

input = open("input.txt")

buffer_iterable = buffered_pipeline()
x = buffer_iterable((process_line(line) for line in input))
y = buffer_iterable((process_item(item) for item in x))
z = buffer_iterable((generate_output_line(item) + "\n" for item in y))

output = open("output.txt", "w")
output.writelines(z)

Колко действителен паралелизъм добавя това зависи от това какво всъщност се случва във всяка итерируема и колко CPU ядра имате/колко са заети.

Класическият пример е GIL на Python: ако всяка стъпка е сравнително тежка за процесора и използва само Python, тогава няма да се добави много паралелизъм и това може да не е по-бързо от серийната версия. От друга страна, ако всеки от тях е мрежов IO тежък, тогава мисля, че е вероятно да бъде по-бърз.

person Michal Charemza    schedule 28.12.2020