Как дождаться завершения всех задач в ThreadPoolExecutor, не завершая работу Executor?

Я не могу использовать shutdown() и awaitTermination(), потому что возможно, что новые задачи будут добавлены в ThreadPoolExecutor, пока он ожидает.

Итак, я ищу способ подождать, пока ThreadPoolExecutor не очистит свою очередь и не завершит все свои задачи, не останавливая добавление новых задач до этого момента.

Если это имеет значение, это для Android.

Спасибо

Обновление: много недель спустя после повторного просмотра я обнаружил, что модифицированный CountDownLatch в этом случае работал лучше для меня. Я сохраню ответ отмеченным, потому что он больше относится к тому, что я спросил.


person cottonBallPaws    schedule 14.10.2010    source источник
comment
Если вы согласны с добавлением новых задач, что произойдет, если они не будут завершены?   -  person Kylar    schedule 14.10.2010
comment
Я думаю, littleFluffyKitty просто хочет дождаться завершения старых задач.   -  person Thilo    schedule 14.10.2010
comment
Меня не особо беспокоит возможность того, что это никогда не закончится, потому что в этом случае что-то еще уже ужасно сломано. Если все остальное не помогает, я мог бы реализовать какой-то тайм-аут, но я в порядке, просто предполагая, что он закончится. Я хочу, чтобы он мог принимать новые задачи во время ожидания, или, другими словами, я хочу, чтобы новые задачи можно было добавлять после вызова ожидания.   -  person cottonBallPaws    schedule 14.10.2010
comment
Я думаю, это связанный вопрос: stackoverflow.com/questions/3402895/java-threadpool-usage. Возможно, вы захотите проверить ответы, перечисленные там ...   -  person sjlee    schedule 14.10.2010
comment
Я предложил другой, возможный ответ. Я до сих пор не понимаю, что вы имеете в виду, ожидая, пока все задачи опустошат очередь, но не желая препятствовать поступлению новых задач в очередь ... в какой-то момент вам нужно провести черту, а TPE не может рисовать линия для вас. Вызов shutdown() - это то, как вы создаете край, после которого новые задачи не могут быть отправлены; вызов awaitTermination() создает преимущество, гарантирующее, что вы заблокируете, пока не будут выполнены все ранее представленные задачи.   -  person andersoj    schedule 14.10.2010
comment
Вы просто хотите дождаться асинхронного уведомления о том, что очередь пуста, и сразу же закрыть ее? TPE (и в более общем плане семантика потоков), вероятно, не позволит вам сделать это, и если бы вы могли, было бы очень сложно рассуждать об этом со стороны отправителей задач.   -  person andersoj    schedule 14.10.2010
comment
См. Также stackoverflow.com/questions/1250643/   -  person rogerdpack    schedule 09.11.2012


Ответы (7)


Если вам интересно знать, когда завершается определенная задача или определенный пакет задач, вы можете использовать ExecutorService.submit(Runnable). Вызов этого метода возвращает объект Future, который может быть помещен в Collection, который ваш основной поток затем будет перебирать, вызывая Future.get() для каждого из них. Это приведет к тому, что ваш основной поток остановит выполнение, пока ExecutorService не обработает все Runnable задачи.

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}
person Tim Bender    schedule 14.10.2010
comment
+1 Кажется, это лучший способ. Однако это должно выполняться на уровне приложения, где вы отправляете задачу, а не на уровне службы исполнителя. - person Thilo; 14.10.2010
comment
+1 или используйте invokeAll () для пакета задач, чтобы дождаться завершения. См. Мой ответ здесь: stackoverflow.com/questions/3269445/ - person andersoj; 14.10.2010
comment
Правильно, если вы хотите выполнять свои задачи Callable вместо Runnable, решение, на которое ссылается @andersoj, намного проще. - person Tim Bender; 14.10.2010
comment
Y, и примечание _1 _- ›_ 2_ тривиально с удобным методом Executors.callable() - person andersoj; 14.10.2010
comment
Требует ли подход invokeAll () знания всех задач заранее? Это для неизвестного количества задач и с возможностью того, что задачи будут добавлены в очередь, даже когда она ожидает. Спасибо за все отличные предложения. - person cottonBallPaws; 14.10.2010
comment
Кроме того, для подхода Future возникнут ли проблемы в цикле for, если что-то будет добавлено как в ExecitorService, так и в коллекцию Futures, пока цикл зацикливается? - person cottonBallPaws; 14.10.2010
comment
Да, invokeAll() потребовалось бы знать и ждать, чтобы отправить блок задач в один момент времени - в противном случае я бы добавил его как полноценный ответ на ваш вопрос. Ответ @Tilo CompletionService имеет более слабое, но похожее ограничение. Я считаю, что у @Thilo был теперь удаленный ответ, который был правильным для вашего заданного вопроса. - person andersoj; 14.10.2010
comment
@littleFluffyKitty, подход Future в этом ответе более или менее эквивалентен решению invokeAll(). Поскольку коллекция futures фактически неизменяема, у нее не возникнет проблем с добавлением новых задач - фрагмент или invokeAll() будет блокироваться до тех пор, пока рассматриваемые задачи (здесь, независимо от того, были ли добавлены myRunnables) не будут завершены. - person andersoj; 14.10.2010
comment
@andersoj и @littleFluffyKitty, код, который я опубликовал, примерно эквивалентен invokeAll (), однако вызовы submit могут так же легко помещать экземпляры Future в BlockingQueue, а затем основной поток может выполнять синхронизированное poll в BlockingQueue, предоставляя вам гибкость ожидания, пока исполнитель не опустеет в течение некоторого времени, при условии, что все фьючерсы помещаются в одну очередь. - person Tim Bender; 15.10.2010
comment
@ Тим Бендер, согласен, ваш подход может быть расширен, чтобы быть более гибким ... с сопутствующей сложностью (из-за открытой частичной синхронизации), взимаемой с вызывающей стороны. Что означает, что очередь на мгновение пуста? - person andersoj; 16.10.2010
comment
@ Тим Бендер, в вашем исходном ответе, возможны ли утечки памяти из-за того, что связанный список удерживает фьючерсы? Как я могу быть уверен, что эти ссылки не хранятся? - person cottonBallPaws; 16.10.2010
comment
@littleFluffyKitty, в исходном ответе память для LinkedList будет существовать в пространстве кучи, пока переменная находится в области видимости и продолжает указывать на эту структуру данных в куче. Если вы определите переменную локально как часть какого-либо метода, она будет существовать на протяжении всего метода. Я действительно не знаю, как вы собираетесь использовать этот фрагмент. Конечно, я мыслю как Java-разработчик, а Android - это НЕ Java. - person Tim Bender; 19.10.2010
comment
это вызовет утечку памяти, сгенерирует много мусора, если есть огромное количество задач. - person ROROROOROROR; 09.07.2014
comment
@ROROROOROROR, No. No. No. - person Tim Bender; 10.07.2014
comment
@TimBender, но в моем проекте (spring-mvc) у меня есть процедура, которая имеет тысячи задач, с этим объектом future многие объекты передаются в Tenured Gen. Ну может это только для моего проекта. - person ROROROOROROR; 10.07.2014
comment
@ROROROOROROR, правильно. Но это не утечка памяти, потому что задачи и ответы ограничены и их можно будет собирать. Это не мусор, потому что, по-видимому, в выводе задачи есть что-то значимое. Если вывод задачи не имеет смысла, можно использовать другой механизм, например CountDownLatch (как указано в OP). Если вы отправите тысячи задач одновременно, большинство из них, вероятно, все равно будут сохранены, потому что пройдет много времени, прежде чем ExecutorService BlockingQueue будет освобожден от них. - person Tim Bender; 10.07.2014

My Scenario - это поисковый робот, который получает некоторую информацию с веб-сайта и затем обрабатывает ее. ThreadPoolExecutor используется для ускорения процесса, потому что за это время может быть загружено много страниц. Таким образом, новые задачи будут созданы в существующей задаче, потому что поисковый робот будет переходить по гиперссылкам на каждой странице. Проблема та же: основной поток не знает, когда все задачи выполнены, и может начать обрабатывать результат. Я использую простой способ определить это. Это не очень элегантно, но в моем случае работает:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
person googol4u    schedule 23.02.2012

Возможно, вы ищете CompletionService для управления пакетами задач см. также это ответ.

person Thilo    schedule 14.10.2010
comment
ваша ссылка кажется неработающей. - person qwertzguy; 06.11.2014
comment
@qwertzguy: Заменил ламер на более стабильный. Спасибо. - person Thilo; 06.11.2014

(Это попытка воспроизвести ранее удаленный ответ Тило с моими собственными корректировками.)

Я думаю, вам может потребоваться прояснить свой вопрос, поскольку существует неявное бесконечное условие ... в какой-то момент вы должны решить закрыть своего исполнителя, и в этот момент он больше не будет принимать никаких задач. Похоже, ваш вопрос подразумевает, что вы хотите подождать, пока вы узнаете, что никакие дальнейшие задачи не будут отправлены, о чем вы можете узнать только в своем собственном коде приложения.

Следующий ответ позволит вам плавно перейти на новый TPE (по любой причине), выполнив все текущие отправленные задачи и не отклоняя новые задачи для нового TPE. Это может ответить на ваш вопрос. @ Могущество Тило тоже.

Предполагая, что вы где-то определили видимый TPE, который используется как таковой:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

Затем вы можете написать процедуру замены TPE как таковую. Его также можно было бы написать с помощью синхронизированного метода, но я думаю, что это проще:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

В качестве альтернативы, если вы действительно не шутите, хотите убить пул потоков, тогда ваша сторона отправителя должна будет справиться с отклоненными задачами в какой-то момент, и вы можете использовать null для нового TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Что может вызвать проблемы с восходящим потоком, вызывающий должен знать, что делать с null.

Вы также можете заменить фиктивный TPE, который просто отклоняет каждое новое выполнение, но это эквивалентно тому, что происходит, если вы вызываете shutdown() на TPE.

person andersoj    schedule 14.10.2010
comment
Спасибо, что нашли время написать это, я все посмотрю и посмотрю, какой маршрут работает лучше всего. - person cottonBallPaws; 15.10.2010

Если вы не хотите использовать shutdown, выполните следующие действия:

  1. Выполните итерации по всем Future задачам из отправки на ExecutorService и проверьте статус с блокирующим вызовом get() на Future объекте, как предложено Tim Bender

  2. Используйте один из

    1. Using invokeAll on ExecutorService
    2. Использование CountDownLatch
    3. Использование ForkJoinPool или newWorkStealingPool из Executors (с Java 8)

invokeAll() на службе исполнителя также достигает той же цели, что и CountDownLatch

Связанный вопрос SE:

Как дождаться нескольких потоков завершить?

person Ravindra babu    schedule 19.04.2016

Вы можете вызвать waitTillDone () в классе Runner:

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

Чтобы использовать его, добавьте в свой проект эту зависимость gradle / maven: 'com.github.matejtymes:javafixes:1.0'

Дополнительные сведения см. Здесь: https://github.com/MatejTymes/JavaFixes или здесь: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

person Matej Tymes    schedule 29.04.2016

Попробуйте использовать размер очереди и количество активных задач, как показано ниже.

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }
person Tuğrul Karakaya    schedule 16.08.2019