У меня есть вопрос о том, как очистить рабочие потоки планировщика при использовании Reactor 3.
Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> {
// WHAT should one do to ensure the worker threads are cleaned up
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));
Когда я выполняю приведенный выше код, я вижу, что после того, как основной поток завершил выполнение, рабочий поток (потоки) все еще находится в состоянии ОЖИДАНИЯ в течение некоторого времени.
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)
Есть ли способ контролировать это? То есть можно ли их утилизировать onComplete()
? Я пробовал Schedulers.shutdownNow()
, и это не помогает.
С другой стороны, когда я это делаю, я могу контролировать удаление планировщика. Какой способ является предпочтительным/рекомендуемым?
reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
.concatWith(Flux.empty())
.publishOn(s)
.doOnComplete(() -> {
s.dispose();
logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));