Как обмениваться большими двоичными данными (~ 50 МБ) между внешними процессами, которые обычно записывают файлы в Python

Моя конечная цель — иметь возможность объединять процессы командной строки, которые работают с файлами, не касаясь диска. Это возможно? Я не могу использовать stdin/stdout, потому что некоторые из процессов, которые мне нужно запустить, принимают только файлы (иногда более одного) в качестве входных данных. Мне удалось сделать это с помощью FIFO и Popen с небольшими файлами в Python, но не с большими файлами (в масштабе МБ). Вот фрагмент кода, который я использую для проверки этой функциональности.

fifo1 = os.getcwd()+'/fifo1.nii'
fifo2 = os.getcwd()+'/fifo2.nii'

command = 'diff \''+fifo1+'\' \''+fifo2+'\''

os.mkfifo(fifo1)
os.mkfifo(fifo2)

with open('1_brain.nii', 'rb', 0) as r:
    s1 = r.read()
with open('run1.nii', 'rb', 0) as r:
    s2 = r.read()

def write(fifo, s):
    with open(fifo, 'wb', 0) as f:
        f.write(s)

writer1 = Thread(target=write, args=[fifo1, s1])
writer1.start()

writer2 = Thread(target=write, args=[fifo2, s2])
writer2.start()

proc = Popen(shlex.split(command), stdout=PIPE)

try:
    while proc.poll() == None:
        continue
    print proc.communicate()[0]
except:
    if proc.poll() == None:
        proc.kill()
    os.unlink(fifo1)
    os.unlink(fifo2)
    raise

os.unlink(fifo1)
os.unlink(fifo2)

Это работает с небольшими текстовыми файлами, но когда я запускаю его с большими двоичными файлами, я получаю ошибку сломанного канала в моих потоках записи, поэтому кажется, что конец чтения (процесс сравнения) закрывается до завершения записи. Я получил процессы чтения файлов для чтения стандартного ввода с помощью символической ссылки на дескриптор файла стандартного ввода, но я не могу использовать стандартный ввод, так как иногда мне нужно несколько входных данных. Есть ли способ заставить FIFO работать, или можно ли создать свои собственные дескрипторы файлов, которые работают как стандартный ввод для отправки данных в процессы? Пожалуйста, дайте мне знать, если что-то из этого неясно! Спасибо.


person Shark    schedule 11.09.2015    source источник
comment
См. pdf-файл на подсказках Python Generator для системных программистов   -  person Peter Wood    schedule 11.09.2015
comment
(1) 50 МБ — это немного для компьютера, поддерживающего CPython. Это указывает на ошибку в вашем коде (2) Почему вы читаете файлы в память только для того, чтобы сбросить их в именованные каналы, вместо того, чтобы напрямую передавать файлы дочернему процессу? Создайте полный минимальный пример кода (используйте фиктивный скрипт Python в качестве дочернего процесса) (3) Отбросьте фиктивный цикл while proc.poll() == None, используйте только proc.communicate() вместо. (4) Не связано: вы можете использовать /dev/fd/N имен файлов вместо именованных каналов   -  person jfs    schedule 11.09.2015
comment
Я пробовал вариант кода из предыдущего комментария с вводом 5 ГБ (в 100 раз больше, чем в вашем случае). Это работает нормально.   -  person jfs    schedule 12.09.2015
comment
Спасибо за ответ! Чтобы ответить на ваш вопрос, я делаю дамп в именованные каналы, потому что моей конечной целью является объединение нескольких процессов в цепочку, поэтому идея состоит в том, что один процесс будет делать дамп в канал, а следующий будет его читать. Будут ли файловые дескрипторы /dev/fd/ вести себя как конвейер, то есть будут ли они способны к поведению, которое я только что описал (выгрузка из одного процесса, затем чтение в следующем)?   -  person Shark    schedule 13.09.2015
comment
@Shark: да, пример кода из ссылки в моем первом комментарии демонстрирует именно это.   -  person jfs    schedule 15.09.2015