Параллельные фьючерсы ждут подмножества задач

Я использую Python concurrent.futures рамки. Я использовал функцию map() для запуска параллельных задач как таковых:

def func(i):
    return i*i

list = [1,2,3,4,5]
async_executor = concurrent.futures.ThreadPoolExecutor(5)
results = async_executor.map(func,list)

Меня интересуют только первые n результаты, и я хочу остановить исполнителя после завершения первых n потоков, где n — число меньше размера входного списка. Есть ли способ сделать это в Python? Есть ли другая структура, на которую я должен обратить внимание?


person user2654096    schedule 02.06.2018    source источник
comment
Обязательно ли, чтобы решение могло завершать даже те функции, которые уже запущены, когда заканчиваются первые N? Или достаточно сказать, что новые задания не будут запускаться после завершения первых N, но уже запущенным заданиям будет разрешено завершиться, даже если мы проигнорируем их результаты? Если вам нужно первое, см.: stackoverflow.com/questions/42782953/ - это невозможно напрямую с помощью используемой вами библиотеки.   -  person John Zwinck    schedule 02.06.2018
comment
Ваше второе утверждение кажется подходящим для моего приложения. Я думаю, что могу использовать счетный семафор, а затем отменить исполнителя, как только семафор достигнет значения «n». Мой вопрос: если я это сделаю, каким именно будет результат «карты»? Будет ли он содержать только результаты первых «n» задач? Будет ли он блокироваться до тех пор, пока не будут вычислены результаты для уже запущенных, но еще не завершенных задач?   -  person user2654096    schedule 02.06.2018


Ответы (1)


Вы не можете использовать map() для этого, потому что он не дает ни способа прекратить ожидание результатов, ни способа получить отправленные фьючерсы и отменить их. Однако вы можете сделать это с помощью submit():

import concurrent.futures
import time

def func(i):
    time.sleep(i)
    return i*i


list = [1,2,3,6,6,6,90,100]
async_executor = concurrent.futures.ThreadPoolExecutor(2)
futures = {async_executor.submit(func, i): i for i in list}
for ii, future in enumerate(concurrent.futures.as_completed(futures)):
    print(ii, "result is", future.result())
    if ii == 2:
        async_executor.shutdown(wait=False)
        for victim in futures:
            victim.cancel()
        break

Приведенный выше код выполняется примерно 11 секунд — он выполняет задания [1,2,3,6,7], но не остальные.

person John Zwinck    schedule 02.06.2018