Как да изчакам всички задачи в ThreadPoolExecutor да завършат, без да изключвам Executor?

Не мога да използвам shutdown() и awaitTermination(), защото е възможно нови задачи да бъдат добавени към ThreadPoolExecutor, докато чака.

Така че търся начин да изчакам, докато ThreadPoolExecutor изпразни опашката си и завърши всичките си задачи, без да спира добавянето на нови задачи преди този момент.

Ако има някаква разлика, това е за Android.

Благодаря

Актуализация: Много седмици по-късно, след като преразгледах това, открих, че модифициран CountDownLatch работи по-добре за мен в този случай. Ще запазя отговора маркиран, защото се отнася повече за това, което попитах.


person cottonBallPaws    schedule 14.10.2010    source източник
comment
Ако сте добре с добавянето на нови задачи, какво ще стане, ако никога не завърши?   -  person Kylar    schedule 14.10.2010
comment
Мисля, че littleFluffyKitty иска само да изчака старите задачи да приключат.   -  person Thilo    schedule 14.10.2010
comment
Не съм толкова загрижен за възможността никога да не завърши, защото ако това е така, тогава нещо друго вече е ужасно счупено. Ако всичко останало се провали, бих могъл да внедря някакво време за изчакване, но съм добре да приема, че ще приключи. Искам да може да приема нови задачи, докато чака, или да го кажа по друг начин, искам нови задачи да могат да се добавят след извикване на чакането.   -  person cottonBallPaws    schedule 14.10.2010
comment
Мисля, че това е свързан въпрос: stackoverflow.com/questions/3402895/java-threadpool-usage. Може да искате да проверите отговорите, изброени там...   -  person sjlee    schedule 14.10.2010
comment
Предложих друг, потенциален отговор. Все още не разбирам какво имаш предвид, като изчакваш всички задачи да изпразнят опашката, но не искаш да попречиш на новите задачи да влизат в опашката... в един момент трябва да теглиш чертата и TPE не може да тегли линията за вас. Извикването на shutdown() е начинът, по който създавате границата, след която не могат да се изпращат нови задачи; извикването на awaitTermination() създава предимство, което гарантира, че блокирате, докато всички по-рано изпратени задачи не бъдат изпълнени.   -  person andersoj    schedule 14.10.2010
comment
Искате ли просто да изчакате асинхронно известие, че опашката е празна и да я затворите точно по това време? TPE (и по-общо, семантиката на нишката) вероятно няма да ви позволи да направите това и ако можехте, би било много трудно да се разсъждава от страна на подателите на задачата.   -  person andersoj    schedule 14.10.2010
comment
Вижте също stackoverflow.com/questions/1250643/   -  person rogerdpack    schedule 09.11.2012


Отговори (7)


Ако се интересувате да знаете кога е завършена определена задача или определена група от задачи, можете да използвате ExecutorService.submit(Runnable). Извикването на този метод връща Future обект, който може да бъде поставен в Collection, който вашата основна нишка след това ще повтори, извиквайки Future.get() за всеки един. Това ще накара основната ви нишка да спре изпълнението, докато ExecutorService не обработи всички Runnable задачи.

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}
person Tim Bender    schedule 14.10.2010
comment
+1 Това изглежда е най-добрият начин. Трябва обаче да се направи на ниво приложение, където изпращате задачата, а не на ниво услуга на изпълнителя. - person Thilo; 14.10.2010
comment
+1 или използвайте invokeAll() за пакет от задачи, за да изчакате завършването им. Вижте моя отговор тук: stackoverflow.com/questions/3269445/ - person andersoj; 14.10.2010
comment
Добре, ако желаете да направите задачите си Callable вместо Runnable, решението, посочено от @andersoj, е много по-просто. - person Tim Bender; 14.10.2010
comment
Y и бележка Runnable-›Callable е тривиална с метода за удобство Executors.callable() - person andersoj; 14.10.2010
comment
Подходът invokeAll() изисква ли познаване на всички задачи предварително? Това е за неизвестен брой задачи и с възможност задачите да бъдат добавени към опашката, дори докато чака. Благодаря за всички страхотни предложения. - person cottonBallPaws; 14.10.2010
comment
Освен това, за подхода Future, цикълът for ще срещне ли проблеми, ако нещо бъде добавено както към executorService, така и към фючърсната колекция, докато цикълът се изпълнява? - person cottonBallPaws; 14.10.2010
comment
Да, invokeAll() ще изисква познаване и изчакване за изпращане на блок от задачи в един момент - в противен случай щях да го добавя като пълноценен отговор на вашия въпрос. Отговорът CompletionService на @Thilo има по-слабо, но подобно ограничение. Вярвам, че @Thilo имаше вече изтрит отговор, който беше правилен за зададения от вас въпрос. - person andersoj; 14.10.2010
comment
@littleFluffyKitty, подходът Future в този отговор е повече или по-малко еквивалентен на решението invokeAll(). Тъй като колекцията futures е ефективно неизменна, тя сама по себе си няма да има проблеми с добавените нови задачи -- фрагментът или invokeAll() ще блокират, докато въпросните задачи (тук, каквито и myRunnables да са добавени) завършат. - person andersoj; 14.10.2010
comment
@andersoj и @littleFluffyKitty, кодът, който публикувах, е приблизително еквивалентен на invokeAll(), но извикванията към submit биха могли също толкова лесно да поставят Future екземплярите в BlockingQueue и след това основната нишка може да направи poll във времето на BlockingQueue, предоставяйки ви гъвкавостта да се изчака, докато изпълнителят е празен за известно време, при условие че всички фючърси са поставени в една и съща опашка. - person Tim Bender; 15.10.2010
comment
@Tim Bender, съгласен съм, вашият подход може да бъде разширен, за да бъде по-гъвкав... със съпътстваща сложност (поради частичната изложена синхронност), наложена на повикващия. Какво означава опашката да е за момент празна? - person andersoj; 16.10.2010
comment
@Tim Bender, във вашия първоначален отговор, възможно ли е да има изтичане на памет поради свързания списък, който държи фючърсите? Как мога да съм сигурен, че тези референции не се съхраняват? - person cottonBallPaws; 16.10.2010
comment
@littleFluffyKitty, В оригиналния отговор паметта за LinkedList ще съществува в пространството на купчината, докато променливата е в обхват и продължава да сочи към тази структура от данни в купчината. Ако дефинирате променливата локално като част от някакъв метод, тя ще съществува по време на продължителността на метода. Наистина не знам как възнамерявате да използвате този фрагмент. Разбира се, аз мисля като Java разработчик, а Android НЕ е Java. - person Tim Bender; 19.10.2010
comment
това ще доведе до изтичане на памет, ще генерира много боклук, ако има огромно количество задачи. - person ROROROOROROR; 09.07.2014
comment
@ROROROOROROR, Не, Не, Не. - person Tim Bender; 10.07.2014
comment
@TimBender, но в моя проект (spring-mvc) имам процедура, която има хиляди задачи, с този future обект, много обекти се предават на Tenured Gen. Е, може би това е само за моя проект. - person ROROROOROROR; 10.07.2014
comment
@ROROROOROROR, правилно. Но това не е изтичане на памет, защото задачите и отговорите са с обхват и ще могат да се събират. Това не е боклук, защото вероятно има нещо смислено в изхода на задачата. Ако изходът на задачата не е смислен, тогава може да се използва друг механизъм като CountDownLatch (както се посочва в OP). Ако изпратите хиляди задачи наведнъж, повечето вероятно ще бъдат наети така или иначе, защото ще мине много време, преди ExecutorService BlockingQueue да бъде освободен от тях. - person Tim Bender; 10.07.2014

My Scenario е уеб робот, който извлича информация от уеб сайт и след това я обработва. ThreadPoolExecutor се използва за ускоряване на процеса, тъй като много страници могат да бъдат заредени навреме. Така че нови задачи ще бъдат създадени в съществуващата задача, тъй като роботът ще следва хипервръзки във всяка страница. Проблемът е същият: основната нишка не знае кога всички задачи са изпълнени и може да започне да обработва резултата. Използвам прост начин да определя това. Не е много елегантно, но работи в моя случай:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
person googol4u    schedule 23.02.2012

Може би търсите CompletionService за управление на партиди задачи, вижте също това отговор.

person Thilo    schedule 14.10.2010
comment
връзката ви изглежда повредена. - person qwertzguy; 06.11.2014
comment
@qwertzguy: Заменен с ламер, по-стабилен. Благодаря. - person Thilo; 06.11.2014

(Това е опит да възпроизведа по-ранния, изтрит отговор на Thilo с мои собствени корекции.)

Мисля, че може да се наложи да изясните въпроса си, тъй като има имплицитно безкрайно условие... в един момент трябва да решите да изключите своя изпълнител и в този момент той няма да приема повече задачи. Вашият въпрос изглежда предполага, че искате да изчакате, докато разберете, че няма да бъдат изпращани други задачи, което можете да знаете само в собствения си код на приложение.

Следният отговор ще ви позволи плавно да преминете към нов TPE (по каквато и да е причина), като завършите всички подадени в момента задачи и не отхвърляте нови задачи към новия TPE. Може да отговори на въпроса ви. @Thilo може също.

Ако приемем, че сте дефинирали някъде видим TPE в употреба като такъв:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

След това можете да напишете рутината за размяна на TPE като такава. Може да се напише и с помощта на синхронизиран метод, но мисля, че това е по-просто:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Като алтернатива, ако наистина не се шегувате, искате да убиете пула от нишки, тогава вашата подаваща страна ще трябва да се справи с отхвърлени задачи в даден момент и можете да използвате null за новия TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Което може да причини проблеми нагоре по веригата, повикващият ще трябва да знае какво да прави с null.

Можете също така да замените с фиктивен TPE, който просто отхвърля всяко ново изпълнение, но това е еквивалентно на това, което се случва, ако извикате shutdown() на TPE.

person andersoj    schedule 14.10.2010
comment
Благодаря, че отделихте време да напишете това, ще разгледам всичко и ще видя кой маршрут работи най-добре. - person cottonBallPaws; 15.10.2010

Ако не искате да използвате shutdown, следвайте подходите по-долу:

  1. Преминете през всички Future задачи от изпращане на ExecutorService и проверете състоянието с блокиращо извикване get() на Future обект, както е предложено от Tim Bender

  2. Използвайте един от

    1. Using invokeAll on ExecutorService
    2. Използване на CountDownLatch
    3. Използване на ForkJoinPool или newWorkStealingPool от Executors(тъй като java 8)

invokeAll() на услугата изпълнител също постига същата цел на CountDownLatch

Свързан SE въпрос:

Как да изчакате няколко нишки за завършване?

person Ravindra babu    schedule 19.04.2016

Можете да извикате waitTillDone() на клас Runner:

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

За да го използвате, добавете тази зависимост от gradle/maven към вашия проект: 'com.github.matejtymes:javafixes:1.0'

За повече подробности вижте тук: https://github.com/MatejTymes/JavaFixes или тук: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

person Matej Tymes    schedule 29.04.2016

Опитайте да използвате размера на опашката и броя на активните задачи, както е показано по-долу

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }
person Tuğrul Karakaya    schedule 16.08.2019