Оператор repeatWhenEmpty не работает в контексте запроса веб-сокета RSocket, но работает в HTTP-вызовах

Я столкнулся со странным поведением оператора repeatWhenEmpty, которое не имеет для меня никакого смысла. Я загрузил репозиторий на Github с минимальным воспроизводимым образцом: https://github.com/codependent/rsocket-rating-service.

Рассмотрим этот контроллер, который предлагает две конечные точки (HTTP @GetMapping("/requestRating") - WebSocket @MessageMapping("request-rating"). Обратите внимание на этот вызов .repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1)))) в середине цепочки generateRating():

@RestController
class RatingServiceRestController {

    private val FAIL_RATE = 0
    private val logger = LoggerFactory.getLogger(javaClass)

    @GetMapping("/requestRating")
    fun getRatingHttp(ratingRequest: RatingRequest): Mono<Rating> {
        return generateRating(ratingRequest)
    }

    @MessageMapping("request-rating")
    fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
        return generateRating(ratingRequest)
    }

    private fun generateRating(ratingRequest: RatingRequest): Mono<Rating> {
        return doGenerateRating(ratingRequest)
                .doOnNext {
                    logger.info("Next1 {}", it)
                }
                .doOnCancel {
                    logger.info("Cancel1")
                }
                .doOnSuccess {
                    logger.info("Success1 {}", it)
                }
                .doOnError { throwable ->
                    logger.error("Error1 {}", throwable)
                }
                .doOnTerminate {
                    logger.info("Terminate1")
                }
                .repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1))))
                .doOnNext {
                    logger.info("Next2 {}", it)
                }
                .doOnCancel {
                    logger.info("Cancel2")
                }
                .doOnSuccess {
                    logger.info("Success2 {}", it)
                }
                .doOnError { throwable ->
                    logger.error("Error2 {}", throwable)
                }
                .doOnTerminate {
                    logger.info("Terminate2")
                }
    }

    private fun doGenerateRating(ratingRequest: RatingRequest): Mono<Rating> {
        return Mono.defer {
            val random = (0..100).random()
            if (random <= FAIL_RATE) {
                Mono.empty()
            } else {
                Mono.just(Rating(ratingRequest.songId, (0..10).random()))
            }
        }
    }
}

После запуска приложения я могу вызвать http://localhost:8080/requestRating?songId=1234, и он вернет результат, показывающий эти журналы, как и ожидалось:

RatingServiceRestController    : Next1 Rating(songId=1234, value=1)
RatingServiceRestController    : Success1 Rating(songId=1234, value=1)
RatingServiceRestController    : Terminate1
RatingServiceRestController    : Cancel1
RatingServiceRestController    : Next2 Rating(songId=1234, value=1)
RatingServiceRestController    : Success2 Rating(songId=1234, value=1)
RatingServiceRestController    : Terminate2

Когда я вызываю ту же логику из Websocket:

  1. Доступ http://localhost:8080/index.html
  2. Заполните любую строку и нажмите отправить

Как ни странно, это все журналы, которые я вижу:

RatingServiceRestController    : Next1 Rating(songId=asfdasf, value=2)
RatingServiceRestController    : Success1 Rating(songId=asfdasf, value=2)
RatingServiceRestController    : Terminate1

Примерно через три минуты это появляется:

RatingServiceRestController    : Cancel2
RatingServiceRestController    : Cancel1

Несмотря на создание элемента, как показано в Next1, операторы doOnXXX после repeatWhenEmpty не вызываются. Очевидно, что и клиент не получает результата.

Что тут происходит? Как я могу использовать repeatWhenEmpty в контексте веб-сокета RSocket?

ОБНОВЛЕНИЕ:

Я добавил оператор log(), чтобы получить дополнительную информацию.

HTTP-запрос:

10:37:01.957  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onSubscribe(MonoNext.NextSubscriber)
10:37:01.959  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : request(unbounded)
10:37:01.967  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Next1 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Success1 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Terminate1
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Cancel1
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onNext(Rating(songId=1234, value=0))
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Next2 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Success2 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Terminate2
10:37:01.976  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onComplete()

Запрос RSocket:

10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : onContextUpdate(Context1{reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$720/0x0000000800597c40@6dfdbeee})
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : onSubscribe(MonoNext.NextSubscriber)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : request(1)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Next1 Rating(songId=asdf, value=0)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Success1 Rating(songId=asdf, value=0)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Terminate1

Через три минуты:

10:40:27.802  INFO 5202 --- [     parallel-1] c.c.r.r.c.RatingServiceRestController    : Cancel2
10:40:27.802  INFO 5202 --- [     parallel-1] reactor.Mono.Defer.2                     : cancel()
10:40:27.802  INFO 5202 --- [     parallel-1] c.c.r.r.c.RatingServiceRestController    : Cancel1

Как видите, есть некоторые отличия

  1. В запросе RSocket есть вызов onContextUpdate.
  2. RSocket запрашивает 1 элемент, HTTP без ограничений
  3. Выполнение RSocket, похоже, повторяет попытку или что-то делает, несмотря на выброс элемента (onNext). ЦП застрял на 6%, выполняя некоторую работу, чего не было в HTTP-вызовах, как вы можете видеть на следующем изображении:

введите здесь описание изображения

ОБНОВЛЕНИЕ 2:

Я занимался отладкой и обнаружил эту разницу в исполнении во время первой отмены, особенно здесь, в классе Operators:

    public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context) {
        if (multiple == null) return;
        if (!knownToBeFinite) return;

        Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);
        if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {
                    if (o != null) {
                        try {
                            hook.accept(o);
                        }
                        catch (Throwable t) {
                            log.warn("Error while discarding element from an Iterator, continuing with next element", t);
                        }
                    }
                });
            }
            catch (Throwable t) {
                log.warn("Error while discarding Iterator, stopping", t);
            }
        }
    }

HTTP:

Контекст:

введите здесь описание изображения

Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);

hook имеет значение null, поэтому он не выполняется:

if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {...

RSocket:

Контекст:

введите здесь описание изображения

В этом случае крючок:

введите здесь описание изображения

И он бесконечно зацикливается в блоке forEachRemaining:

    if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {
                    if (o != null) {
                        try {
                            hook.accept(o);
                        }
                        catch (Throwable t) {
                            log.warn("Error while discarding element from an Iterator, continuing with next element", t);
                        }
                    }
                });
            }

person codependent    schedule 31.05.2020    source источник


Ответы (1)


Как видно из https://github.com/rsocket/rsocket-java/issues/860 похоже, что проблема в Project Reactor.

Я перешел на следующего оператора, как было рекомендовано, что решило проблему:

.repeatWhenEmpty(30) { longFlux ->
      longFlux.delayElements(Duration.ofSeconds(1))
             .doOnNext { logger.info("Repeating {}", it) }
}
person codependent    schedule 04.06.2020
comment
корневая проблема была исправлена ​​в выпусках 3.2.19 и 3.3.8, см. github.com / response / response-core / issues / 2196 - person Simon Baslé; 27.07.2020
comment
@Simon Спасибо за информацию! - person codependent; 28.07.2020