Как создать будущее, только если доступен бесплатный рабочий

Я пытаюсь отправить информацию, извлеченную из строк большого файла, процессу, работающему на каком-то сервере.

Чтобы ускорить это, я хотел бы сделать это с несколькими потоками параллельно.

Используя бэкпорт Python 2.7 для concurrent.futures, я попробовал это:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

Однако это проблематично, потому что все фьючерсы отправляются мгновенно, так что моей машине не хватает памяти, потому что в память загружается весь файл.

Мой вопрос в том, есть ли простой способ представить новое будущее только тогда, когда доступен свободный работник.


person tobigue    schedule 12.09.2013    source источник
comment
Возможно, будет быстрее использовать функцию os.sendfile для отправки перевода файл через сокет.   -  person Bakuriu    schedule 12.09.2013
comment
Возможно, мне следовало упомянуть, что этот пример немного упрощен... на самом деле я отправляю информацию, извлеченную из каждой строки файла, в JSON-REST API.   -  person tobigue    schedule 12.09.2013


Ответы (1)


Вы можете перебирать фрагменты файла, используя

for chunk in zip(*[f]*chunksize):

(Это приложение рецепта группировщика, который собирает элементы из итератора f в группы размером chunksize.Примечание: при этом не используется весь файл сразу, так как zip возвращает итератор в Python3.)


import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

Вот вы в комментариях правильно указываете, что это не оптимально. Там может быть какой-то работник, который занимает много времени и задерживает целый кусок работы.

Обычно, если каждый вызов работника занимает примерно одинаковое количество времени, то это не имеет большого значения. Однако есть способ продвинуть дескриптор файла по требованию. Он использует threading.Condition, чтобы уведомить sprinkler о продвижении дескриптора файла.

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()
person unutbu    schedule 12.09.2013
comment
Мне нравится, куда это идет, но это страдает от исходной проблемы. Основной поток все еще пытается поставить в очередь входной файл. Как насчет одной очереди возврата, в которой каждый поток публикует сообщение «Готово». Основной поток может поместить, скажем, 16 элементов в очередь, а затем добавлять элементы только по мере получения сообщений «выполнено». - person tdelaney; 12.09.2013
comment
Хорошо, в этом случае вы можете обрабатывать файл по частям, используя for chunk in IT.izip(*[f]*chunksize). - person unutbu; 12.09.2013
comment
Спасибо за предложение! Я еще не смог проверить это, но это действительно может ускорить работу для меня. Однако это не оптимальное решение, потому что, насколько я вижу, все фьючерсы в чанке должны завершиться до того, как будет отправлен новый чанк (что может привести к простоям рабочих). Также обратите внимание, что я использую Python 2.7 и бэкпорт concurrent.futures, поэтому в этом случае, вероятно, придется использовать izip. - person tobigue; 12.09.2013
comment
Еще раз спасибо за подробный ответ! Второй подход кажется более оптимальным для решения проблемы, однако он также достаточно громоздкий. Публикуя вопрос, я надеялся, что будет более компактное/питоновское решение без использования очередей и т. д., как было в вашем первом случае. Поэтому я оставлю вопрос открытым еще на несколько дней, чтобы посмотреть, есть ли другой способ подойти к этому, а пока я воспользуюсь вашим первым предложением, так как в моем случае запросы действительно должны занимать примерно одинаковое количество времени, поэтому еще раз спасибо. - person tobigue; 13.09.2013