Я столкнулся со странным поведением оператора repeatWhenEmpty
, которое не имеет для меня никакого смысла. Я загрузил репозиторий на Github с минимальным воспроизводимым образцом: https://github.com/codependent/rsocket-rating-service.
Рассмотрим этот контроллер, который предлагает две конечные точки (HTTP @GetMapping("/requestRating")
- WebSocket @MessageMapping("request-rating")
. Обратите внимание на этот вызов .repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1))))
в середине цепочки generateRating()
:
@RestController
class RatingServiceRestController {
private val FAIL_RATE = 0
private val logger = LoggerFactory.getLogger(javaClass)
@GetMapping("/requestRating")
fun getRatingHttp(ratingRequest: RatingRequest): Mono<Rating> {
return generateRating(ratingRequest)
}
@MessageMapping("request-rating")
fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
return generateRating(ratingRequest)
}
private fun generateRating(ratingRequest: RatingRequest): Mono<Rating> {
return doGenerateRating(ratingRequest)
.doOnNext {
logger.info("Next1 {}", it)
}
.doOnCancel {
logger.info("Cancel1")
}
.doOnSuccess {
logger.info("Success1 {}", it)
}
.doOnError { throwable ->
logger.error("Error1 {}", throwable)
}
.doOnTerminate {
logger.info("Terminate1")
}
.repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1))))
.doOnNext {
logger.info("Next2 {}", it)
}
.doOnCancel {
logger.info("Cancel2")
}
.doOnSuccess {
logger.info("Success2 {}", it)
}
.doOnError { throwable ->
logger.error("Error2 {}", throwable)
}
.doOnTerminate {
logger.info("Terminate2")
}
}
private fun doGenerateRating(ratingRequest: RatingRequest): Mono<Rating> {
return Mono.defer {
val random = (0..100).random()
if (random <= FAIL_RATE) {
Mono.empty()
} else {
Mono.just(Rating(ratingRequest.songId, (0..10).random()))
}
}
}
}
После запуска приложения я могу вызвать http://localhost:8080/requestRating?songId=1234
, и он вернет результат, показывающий эти журналы, как и ожидалось:
RatingServiceRestController : Next1 Rating(songId=1234, value=1)
RatingServiceRestController : Success1 Rating(songId=1234, value=1)
RatingServiceRestController : Terminate1
RatingServiceRestController : Cancel1
RatingServiceRestController : Next2 Rating(songId=1234, value=1)
RatingServiceRestController : Success2 Rating(songId=1234, value=1)
RatingServiceRestController : Terminate2
Когда я вызываю ту же логику из Websocket:
- Доступ
http://localhost:8080/index.html
- Заполните любую строку и нажмите отправить
Как ни странно, это все журналы, которые я вижу:
RatingServiceRestController : Next1 Rating(songId=asfdasf, value=2)
RatingServiceRestController : Success1 Rating(songId=asfdasf, value=2)
RatingServiceRestController : Terminate1
Примерно через три минуты это появляется:
RatingServiceRestController : Cancel2
RatingServiceRestController : Cancel1
Несмотря на создание элемента, как показано в Next1
, операторы doOnXXX
после repeatWhenEmpty
не вызываются. Очевидно, что и клиент не получает результата.
Что тут происходит? Как я могу использовать repeatWhenEmpty
в контексте веб-сокета RSocket?
ОБНОВЛЕНИЕ:
Я добавил оператор log()
, чтобы получить дополнительную информацию.
HTTP-запрос:
10:37:01.957 INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1 : onSubscribe(MonoNext.NextSubscriber)
10:37:01.959 INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1 : request(unbounded)
10:37:01.967 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Next1 Rating(songId=1234, value=0)
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Success1 Rating(songId=1234, value=0)
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Terminate1
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Cancel1
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1 : onNext(Rating(songId=1234, value=0))
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Next2 Rating(songId=1234, value=0)
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Success2 Rating(songId=1234, value=0)
10:37:01.968 INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController : Terminate2
10:37:01.976 INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1 : onComplete()
Запрос RSocket:
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2 : onContextUpdate(Context1{reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$720/0x0000000800597c40@6dfdbeee})
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2 : onSubscribe(MonoNext.NextSubscriber)
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2 : request(1)
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Next1 Rating(songId=asdf, value=0)
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Success1 Rating(songId=asdf, value=0)
10:37:29.143 INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Terminate1
Через три минуты:
10:40:27.802 INFO 5202 --- [ parallel-1] c.c.r.r.c.RatingServiceRestController : Cancel2
10:40:27.802 INFO 5202 --- [ parallel-1] reactor.Mono.Defer.2 : cancel()
10:40:27.802 INFO 5202 --- [ parallel-1] c.c.r.r.c.RatingServiceRestController : Cancel1
Как видите, есть некоторые отличия
- В запросе RSocket есть вызов
onContextUpdate
. - RSocket запрашивает 1 элемент, HTTP без ограничений
- Выполнение RSocket, похоже, повторяет попытку или что-то делает, несмотря на выброс элемента (onNext). ЦП застрял на 6%, выполняя некоторую работу, чего не было в HTTP-вызовах, как вы можете видеть на следующем изображении:
ОБНОВЛЕНИЕ 2:
Я занимался отладкой и обнаружил эту разницу в исполнении во время первой отмены, особенно здесь, в классе Operators
:
public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context) {
if (multiple == null) return;
if (!knownToBeFinite) return;
Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);
if (hook != null) {
try {
multiple.forEachRemaining(o -> {
if (o != null) {
try {
hook.accept(o);
}
catch (Throwable t) {
log.warn("Error while discarding element from an Iterator, continuing with next element", t);
}
}
});
}
catch (Throwable t) {
log.warn("Error while discarding Iterator, stopping", t);
}
}
}
HTTP:
Контекст:
Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);
hook
имеет значение null, поэтому он не выполняется:
if (hook != null) {
try {
multiple.forEachRemaining(o -> {...
RSocket:
Контекст:
В этом случае крючок:
И он бесконечно зацикливается в блоке forEachRemaining
:
if (hook != null) {
try {
multiple.forEachRemaining(o -> {
if (o != null) {
try {
hook.accept(o);
}
catch (Throwable t) {
log.warn("Error while discarding element from an Iterator, continuing with next element", t);
}
}
});
}