concurrent.futures не распараллеливает запись

У меня есть список dataframe_chunk, который содержит фрагменты очень большого фрейма данных pandas. Я хотел бы записать каждый фрагмент в другой CSV и сделать это параллельно. Однако я вижу, что файлы записываются последовательно, и я не уверен, почему это так. Вот код:

import concurrent.futures as cfu

def write_chunk_to_file(chunk, fpath):  
    chunk.to_csv(fpath, sep=',', header=False, index=False)

pool = cfu.ThreadPoolExecutor(N_CORES)

futures = []
for i in range(N_CORES):
    fpath = '/path_to_files_'+str(i)+'.csv'
    futures.append(pool.submit( write_chunk_to_file(dataframe_chunk[i], fpath)))

for f in cfu.as_completed(futures):
    print("finished at ",time.time())

Любые подсказки?


person elelias    schedule 19.07.2016    source источник


Ответы (1)


Одна вещь, которая указана в Python 2.7.x threading, но отсутствует в 3.x заключается в том, что Python не может достичь истинного параллелизма с помощью библиотеки threading — одновременно будет выполняться только один поток.

Вам следует попробовать использовать concurrent.futures с ProcessPoolExecutor, который использует отдельные процессы для каждого задания и, следовательно, может обеспечить настоящий параллелизм на многоядерном процессоре.

Обновить

Вот ваша программа, адаптированная для использования библиотеки multiprocessing:

#!/usr/bin/env python3

from multiprocessing import Process

import os
import time

N_CORES = 8

def write_chunk_to_file(chunk, fpath):  
    with open(fpath, "w") as f:
      for x in range(10000000):
        f.write(str(x))

futures = []

print("my pid:", os.getpid())
input("Hit return to start:")

start = time.time()
print("Started at:", start)

for i in range(N_CORES):
    fpath = './tmp/file-'+str(i)+'.csv'
    p = Process(target=write_chunk_to_file, args=(i,fpath))
    futures.append(p)

for p in futures:
  p.start()

print("All jobs started.")

for p in futures:
  p.join()

print("All jobs finished at ",time.time())

Вы можете отслеживать задания с помощью этой команды оболочки в другом окне:

while true; do clear; pstree 12345; ls -l tmp; sleep 1; done

(Замените 12345 идентификатором процесса, созданным сценарием.)

person ErikR    schedule 19.07.2016
comment
Ответ обновлен - мне больше повезло с использованием multiprocessing напрямую. - person ErikR; 19.07.2016