Планировщики Reactor продолжают работать еще долго после завершения основного потока? Как с этим справиться?

У меня есть вопрос о том, как очистить рабочие потоки планировщика при использовании Reactor 3.

Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> { 
    // WHAT should one do to ensure the worker threads are cleaned up
    logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));

Когда я выполняю приведенный выше код, я вижу, что после того, как основной поток завершил выполнение, рабочий поток (потоки) все еще находится в состоянии ОЖИДАНИЯ в течение некоторого времени.

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)

Есть ли способ контролировать это? То есть можно ли их утилизировать onComplete()? Я пробовал Schedulers.shutdownNow(), и это не помогает.

С другой стороны, когда я это делаю, я могу контролировать удаление планировщика. Какой способ является предпочтительным/рекомендуемым?

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
        Flux.range(1, 10000)
        .concatWith(Flux.empty())
        .publishOn(s)
        .doOnComplete(() -> {           
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
        })
        .subscribe(x -> logger.debug(x+ "**"));

person Chetya    schedule 20.12.2017    source источник


Ответы (2)


Если вы используете Schedulers.new[Elastic|...], то вы обязаны отслеживать получившийся Scheduler, если хотите его закрыть. Schedulers.shutdownNow() закроет планировщики по умолчанию, используемые библиотекой, только если вы не указали явно, например Schedulers.elastic() (обратите внимание, что префикс new отсутствует).

Лучший способ очистки после выполнения всех операций — использовать doFinally. Это асинхронно выполнит обратный вызов очистки после onError|onComplete|cancel событий. Лучше убедитесь, что это последний оператор в цепочке, хотя он действительно пытается выполниться последним во всех случаях.

Единственное предостережение заключается в том, что он выполняется в том же потоке, что и предыдущие операторы, другими словами, тот самый поток, который вы пытаетесь закрыть... s.dispose() в обратном вызове doFinally остановит исполнителя после обработки его очереди задач, поэтому в этом случае будет небольшая задержка перед исчезновением потока.

Вот пример, который выводит информацию о потоке, переключается на пользовательский эластичный поток и закрывает его в doFinally (добавлены фильтр и материализация, чтобы получить более короткий журнал с лучшим представлением о том, как разыгрываются события):

@Test
public void schedulerFinallyShutdown() throws InterruptedException {
    ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
    Logger logger = Loggers.getLogger("foo");
    CountDownLatch latch = new CountDownLatch(1);
    reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
    Flux.range(1, 10000)
        .publishOn(s)
        .concatWith(Flux.<Integer>empty().doOnComplete(() -> {
            for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
                System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState());
            }
        }))
        .doFinally(sig -> {
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
            latch.countDown();
        })
        .filter(x -> x % 1000 == 0)
        .materialize()
        .subscribe(x -> logger.info(x+ "**"));

    latch.await();
    for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
        System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState());
    }
} 

Это распечатывает:

11:24:36.608 [X-2] INFO  foo - onNext(1000)**
11:24:36.611 [X-2] INFO  foo - onNext(2000)**
11:24:36.611 [X-2] INFO  foo - onNext(3000)**
11:24:36.612 [X-2] INFO  foo - onNext(4000)**
11:24:36.612 [X-2] INFO  foo - onNext(5000)**
11:24:36.612 [X-2] INFO  foo - onNext(6000)**
11:24:36.612 [X-2] INFO  foo - onNext(7000)**
11:24:36.613 [X-2] INFO  foo - onNext(8000)**
11:24:36.613 [X-2] INFO  foo - onNext(9000)**
11:24:36.613 [X-2] INFO  foo - onNext(10000)**
last element    X-2 RUNNABLE
last element    elastic-evictor-1 TIMED_WAITING
last element    Monitor Ctrl-Break RUNNABLE
last element    Signal Dispatcher RUNNABLE
last element    Finalizer WAITING
last element    Reference Handler WAITING
last element    main WAITING
11:24:36.626 [X-2] INFO  foo - onComplete()**
11:24:36.627 [X-2] INFO  foo - Shut down all Scheduler worker threads
after cleanup   Monitor Ctrl-Break RUNNABLE
after cleanup   Signal Dispatcher RUNNABLE
after cleanup   Finalizer WAITING
after cleanup   Reference Handler WAITING
after cleanup   main RUNNABLE
person Simon Baslé    schedule 21.12.2017

Вам нужно вызвать метод blockLast() на вашем потоке, чтобы убедиться, что он завершен. Это особенно важно, если вы запускаете поток параллельно или в другом потоке, отличном от основного.

NB: вам нужно вызвать publishOn выше в цепочке, см. справочное руководство чтобы понять, почему позиция публикации важна.

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
Flux.range(1, 10000)
    .concatWith(Flux.empty())
    .publishOn(s)
    .doOnComplete(() -> {           
       logger.info("Shut down all Scheduler worker threads");
    }).blocklast();
s.dispose();
person excbot    schedule 21.12.2017