Вопросы по теме 'project-reactor'

ParallelFlux против flatMap () для задачи блокировки ввода-вывода
У меня есть цепочка Project Reactor, которая включает задачу блокировки (сетевой вызов, нам нужно дождаться ответа). Я хочу одновременно запускать несколько блокирующих задач. Похоже, что можно использовать ParallelFlux или flatMap (), простые...
5860 просмотров
schedule 17.12.2023

Отсутствующие сообщения от TopicProcessor в Reactor 3
Я запускаю простой тест, в котором я публикую сообщения в TopicProcessor из 4 потоков, а в подписчике просто добавляю их в коллекцию. Код следующий: @Test public void testProcessingMessages() throws Exception { int numberOfMessages = 1000;...
446 просмотров
schedule 06.01.2024

Преодоление Reactor's Flux от gRPC StreamObserver
Я хочу создать Reactor Flux из gRPC StreamObserver . Это необходимо сделать до тех пор, пока StreamObserver не реализует соответствующие интерфейсы изначально (см., например, эта проблема ). То, что я придумал, примерно похоже на следующее:...
1437 просмотров
schedule 07.04.2024

Reactor 3.x - ограничить время группы By Flux
есть ли способ заставить Flux, сгенерированный groupBy (), завершиться через определенный период времени (или аналогичным образом ограничить максимальное количество «открытых» групп) независимо от полноты восходящего потока? У меня примерно...
1624 просмотров
schedule 26.12.2023

Планировщики Reactor продолжают работать еще долго после завершения основного потока? Как с этим справиться?
У меня есть вопрос о том, как очистить рабочие потоки планировщика при использовании Reactor 3. Flux.range(1, 10000) .publishOn(Schedulers.newElastic("Y")) .doOnComplete(() -> { // WHAT should one do to ensure the worker threads are...
3521 просмотров
schedule 24.03.2024

Как заставить AuditorAware работать с Spring Data Mongo Reactive
Spring Security 5 предоставляет ReactiveSecurityContextHolder для извлечения SecurityContext из контекста Reactive , но когда я хочу реализовать AuditorAware и получить работу для прослушивания автоматически, но не работает. В настоящее...
2476 просмотров

Можно ли запустить Mono параллельно и агрегировать результат
Я знаю, что можно связать Моно, например ... Mono<String> resultAMono = loadA(); Mono<String> resultBMono = resultA.flatMap(resultA -> loadB()); Это будет цепочка, и resultBMono будет запущен, когда resultAMono вернется .......
12342 просмотров
schedule 04.10.2022

Перезванивать друг другу службы Rest с использованием Spring WebFlux и повторно использовать результат промежуточных служб?
Я новичок в Spring WebFlux, но я пытаюсь добиться этого, как указано ниже. First Service -> AuthService Mono<String> ->gives auth token Second Service -> ServiceSecodn Uses output from above service Third service -> Uses output...
537 просмотров
schedule 01.02.2024

Использование методов Mono с динамическими параметрами
У меня есть функция, которая возвращает ввод как Mono: public static Mono<Integer> emitter(int param){ return Mono.just(param) .delayElement(Duration.ofMillis(100)); //delay to simulate http response } Я хотел бы вызвать...
1476 просмотров

Шина данных с Java Flux API
Я изучал Java Flux API, когда в некоторых руководствах заметил, что вы можете подписаться только на одного издателя. Тем не менее, я вижу вариант использования, когда полезно иметь несколько издателей для одной подписки: своего рода шина...
134 просмотров

Команды соединения реактора
Я практикую Reactor в Spring WebFlux и считаю, что кривая обучения довольно крутая. Все, что я хотел сделать, это что-то вроде: Данные, поступающие от контроллера. Я запрашиваю настройки сайта, которые возвращают Mono. Я делаю запрос к...
1118 просмотров
schedule 15.11.2023

Springboot. Реактивный веб-клиент. Соединение преждевременно закрыто ДО ответа
Я столкнулся с этой проблемой В объединенном соединении обнаружена ошибка response.netty.http.client.HttpClientOperations $ PrematureCloseException: соединение преждевременно закрыто ДО ответа ". Я собираю метрики с графитового сервера...
3730 просмотров

Почему я получаю «Разрешено только одно подключение для получения подписчика», когда в ответе есть код состояния ошибки HTTP?
В запросе на отдых я отправляю другой запрос в веб-службу с помощью Spring WebClient и хочу вернуть результат вызывающей стороне: return webClient.post() .uri(url) .body(...) .retrieve() .bodyToMono(String::class.java) .map {...
1691 просмотров

Проект "Реактор". Mono.map () против Mono.flatMap ()
В чем принципиальная разница между ними с точки зрения Mono ? Из документации я прочитал, что flatMap действует асинхронно и map синхронно. Но для меня это не имеет смысла, потому что Mono - это все о параллелизме, и этот момент непонятен....
16584 просмотров
schedule 01.06.2024

Как условно установить код состояния на HTTP-сервере Project Reactor Netty?
Я подозреваю, что мне здесь не хватает чего-то действительно очевидного, но я не могу понять, как достичь очень простого варианта использования HttpServer в response-netty. По сути, я хочу реализовать возможность отвечать 200 OK , когда объект...
128 просмотров
schedule 29.02.2024

Несоответствие поведения между интеграционным тестом Webflux WebTestClient и вызовом Postman REST
Я борюсь с поведением несоответствия между интеграционным тестом и простым вызовом REST . Позвольте мне объяснить: у меня была ошибка в моем производственном коде, вызывающая исключение: NoSuchElementException: Source was empty , когда я...
254 просмотров

Как развернуть реактивный (netty) на сервере приложений (в моем случае weblogic)?
Глядя вокруг (пример Как SpringBoot работает на серверах приложений ), очень просто переместить приложение springboot rest (таким образом, используя spring-boot-starter-web) на сервер приложений, потому что, я думаю, в конце концов у них всех есть...
205 просмотров

Оператор repeatWhenEmpty не работает в контексте запроса веб-сокета RSocket, но работает в HTTP-вызовах
Я столкнулся со странным поведением оператора repeatWhenEmpty , которое не имеет для меня никакого смысла. Я загрузил репозиторий на Github с минимальным воспроизводимым образцом: https://github.com/codependent/rsocket-rating-service ....
381 просмотров

Элемент задержки в RxJava Flowable
В издателе Flux @ProjectReactor есть очень интуитивно понятный оператор delayElements , который вводит задержку между каждым испускаемым элементом. Скажем, например, следующая треска испускает элемент в секунду. Flux.fromIterable(List.of(1, 2,...
109 просмотров
schedule 13.03.2024

Невозможно прочитать файл ресурсов пути к классам неблокирующим образом внутри контейнера докеров
Я пытаюсь загрузить файл JSON sample.json из каталога src/main/resources . Мне нужно сопоставить этот json-объект с Java-объектом. И мое приложение реактивное, я использую Spring webflux. Я следил за блогом Саймона , чтобы придумать...
554 просмотров