Как я могу распараллелить конвейер генераторов/итераторов в Python?

Предположим, у меня есть некоторый код Python, подобный следующему:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Этот код считывает каждую строку из входного файла, прогоняет ее через несколько функций и записывает вывод в выходной файл. Теперь я знаю, что функции process_line, process_item и generate_output_line никогда не будут мешать друг другу, и давайте предположим, что входной и выходной файлы находятся на разных дисках, так что чтение и запись не будут мешать друг другу Другие.

Но Python, вероятно, ничего этого не знает. Насколько я понимаю, Python будет читать одну строку, применять каждую функцию по очереди и записывать результат в вывод, а затем он будет читать вторую строку только после отправки первой строки в вывод, поэтому что вторая линия не входит в конвейер, пока не выйдет первая. Я правильно понимаю, как будет течь эта программа? Если это так, то есть ли какой-нибудь простой способ сделать так, чтобы несколько строк одновременно находились в конвейере, чтобы программа читала, писала и обрабатывала каждый шаг параллельно?


person Ryan C. Thompson    schedule 16.04.2011    source источник
comment
Я думаю, вам нужно добавить многопоточность для этого, поэтому я бы сказал нет - у вас нет многопоточности в CPython и в других реализациях, возможно, не стоит иметь сто строк вместо пяти.   -  person    schedule 16.04.2011


Ответы (2)


Вы не можете распараллелить чтение или запись в файлы; это будет вашим узким местом, в конечном счете. Вы уверены, что узким местом здесь является ЦП, а не ввод-вывод?

Поскольку ваша обработка не содержит зависимостей (по вашему мнению), использовать тривиально просто. Класс multiprocessing.Pool Python.

Есть несколько способов написать это, но проще w.r.t. отладка заключается в том, чтобы найти независимые критические пути (самая медленная часть кода), которые мы заставим работать параллельно. Предположим, что это process_item.

…И это все, на самом деле. Код:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)

Я не проверял это, но это основная идея. Метод imap пула гарантирует, что результаты возвращаются в правильном порядке.

person Samat Jain    schedule 16.04.2011
comment
Я запускаю это на сервере с действительно быстрым вводом-выводом. Меня тоже удивило, что ввод-вывод не был узким местом. Кроме того, я могу одновременно читать и писать с небольшим замедлением. Есть ли причина, по которой я не могу или не должен использовать p.imap на всех трех этапах обработки? У меня есть 48 ядер, с которыми можно поиграться, поэтому, если я смогу хорошо распараллелить, я, возможно, снова смогу сделать ввод-вывод медленным шагом. - person Ryan C. Thompson; 16.04.2011
comment
48 ядер — это… много. Настолько много, что накладные расходы на блокировку/семафор от многопроцессорного модуля Python становятся значительными. Может быть, вам стоит взглянуть на Hadoop? - person Samat Jain; 19.04.2011
comment
Чтобы ответить на ваш вопрос, это зависит от вашего кода. Судя по тому, что вы дали, я бы, вероятно, объединил все 3 функции в одну (которая принимает строку и возвращает строку вывода) и передал ее в imap. - person Samat Jain; 19.04.2011
comment
Будет ли меньше накладных расходов, если я не забочусь о сохранении порядка строк? - person Ryan C. Thompson; 19.04.2011

есть ли простой способ сделать так, чтобы несколько строк могли быть в конвейере одновременно

Я написал библиотеку для этого: https://github.com/michalc/threaded-buffered-pipeline, который перебирает каждую итерацию в отдельном потоке.

Так что же было

input = open("input.txt")

x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)

output = open("output.txt", "w")
output.writelines(z)

становится

from threaded_buffered_pipeline import buffered_pipeline

input = open("input.txt")

buffer_iterable = buffered_pipeline()
x = buffer_iterable((process_line(line) for line in input))
y = buffer_iterable((process_item(item) for item in x))
z = buffer_iterable((generate_output_line(item) + "\n" for item in y))

output = open("output.txt", "w")
output.writelines(z)

Сколько фактического параллелизма это добавляет, зависит от того, что на самом деле происходит в каждой итерации, и от того, сколько ядер ЦП у вас есть/насколько они заняты.

Классическим примером является Python GIL: если каждый шаг довольно сильно загружает процессор и использует только Python, тогда не будет добавлено много параллелизма, и это может быть не быстрее, чем последовательная версия. С другой стороны, если каждый из них является тяжелым сетевым вводом-выводом, то я думаю, что он, вероятно, будет быстрее.

person Michal Charemza    schedule 28.12.2020