В конфигурации Reactor Netty HttpClient / HttpServer я не могу получить объект запроса в сообщении на сервер.

HttpClient Reactor-Netty отправляет запрос HttpClientRequest на HttpServer. На стороне сервера я не могу получить объект запроса из HttpServerRequest. Я могу получить FluxByteBuf из HttpServerRequest, но не объект ByteBuf в объекте Flux. Обычно подписка на Flux позволяет мне получить объект запроса, но здесь это не работает. Ответ от сервера успешно получен клиентом. Кто-нибудь знает, почему подписка на Flux / Mono не работает на стороне сервера клиента / сервера Reactor-Netty?

Код клиента:

public class Client {

    private static final Logger log = Logger.getLogger(Client.class.getSimpleName());

    public static void main(String[] args) throws InterruptedException {

        Consumer<byte[]> onSuccess = (byte[] response) -> {
            ElectionResponse electionResponse = SerializationUtils.deserialize(response);
            log.info("response in onSuccess: "+electionResponse);

        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };

        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };
        ElectionRequest electionRequest = new ElectionRequest("aRequest");
        byte[] requestBytes = SerializationUtils.serialize(electionRequest);
        ByteBuf requestByteBuf = Unpooled.copiedBuffer(requestBytes);

        HttpClient.create()
                .tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
                .port(61005)
                .post()
                .uri("/echo")
                .send(Mono.just(requestByteBuf))
                .responseContent()
                .aggregate()
                .asByteArray()
                .subscribe(onSuccess, onError, onCompletion);

        try {
            Thread.sleep(15000);
        } catch(InterruptedException ie) {
            ie.printStackTrace();
        }
    }
}

Код сервера:

public class Server {

    private static final Logger log = Logger.getLogger(Server.class.getSimpleName());

    public static void main(String[] args) {

        ElectionResponse electionResponse = new ElectionResponse("aResponse");
        byte[] responseArray = SerializationUtils.serialize(electionResponse);

        Consumer<ByteBuf> onSuccess = (ByteBuf request) -> {
            System.out.println("onSuccess: Request received!");
        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };
        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };

        DisposableServer server =
            HttpServer.create()
                      .host("10.0.0.19")
                      .port(61005)
                      .route(routes ->
                          routes
                            .post("/echo",
                                (request, response) -> {
                                    request.receive().retain().next().subscribe(onSuccess, onError, onCompletion);
                                    return response.send(Mono.just(Unpooled.copiedBuffer(responseArray).retain()));
                                                       }
                                    ))
                        .bindNow();

        server.onDispose()
                .block();
    }
}

Зависимости среды и maven следующие:

Apache Maven 3.6.1
Maven home: /usr/share/maven
Java version: 11.0.6, vendor: Oracle Corporation, runtime: /home/linuxlp/opt/graalvm/graalvm-svm-linux-20.1.0-ea+28
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.3.0-51-generic", arch: "amd64", family: "unix"

    <dependencies>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.reactivestreams</groupId>
            <artifactId>reactive-streams</artifactId>
            <version>1.0.3</version>
        </dependency>

person Chris Grimes    schedule 18.05.2020    source источник


Ответы (1)


Что вы думаете о составлении такого запроса / ответа

public class Server {

    private static final Logger log = Logger.getLogger(Server.class.getSimpleName());

    public static void main(String[] args) {

        Consumer<byte[]> onSuccess = (byte[] request) -> {
            System.out.println("onSuccess: Request received!");
        };
        Consumer<Throwable> onError = (Throwable ex) -> {
            ex.getMessage();
        };
        Runnable onCompletion = () -> {
            System.out.println("Message Completed");

        };

        DisposableServer server =
                HttpServer.create()
                        .host("10.0.0.19")
                        .port(61005)
                        .route(routes ->
                                routes.post("/echo",
                                        (request, response) ->
                                                response.send(request.receive()
                                                                     .aggregate()
                                                                     .asByteArray()
                                                                     .doOnNext(onSuccess)
                                                                     .doOnError(onError)
                                                                     .doOnTerminate(onCompletion)
                                                                     .flatMap(bytes -> {
                                                                         ElectionRequest electionRequest = (ElectionRequest) SerializationUtils.deserialize(bytes);
                                                                         ElectionResponse electionResponse = new ElectionResponse(electionRequest.getStr());
                                                                         return Mono.just(Unpooled.copiedBuffer(SerializationUtils.serialize(electionResponse)));
                                                                     }))))
                        .bindNow();

        server.onDispose()
                .block();
    }
}
person Violeta Georgieva    schedule 20.05.2020
comment
Спасибо за ваш ответ, но когда я его реализую, у меня возникает та же проблема, что и при использовании subscribe, то есть onSuccess в doOnNext никогда не вызывается. Как я уже сказал, на стороне сервера мне нужно получить объект запроса домена из запроса ByteBufFlux, воздействовать на него в службе, а затем вернуть объект ответа, содержащийся в объекте HttpServerResponse. Ответ работает нормально. Когда клиент отправляет несколько запросов, клиент получает ожидаемый ответ (-ы). Проблема находится на стороне сервера, где лямбда onSuccess не вызывается подпиской или doOnNext. - person Chris Grimes; 20.05.2020
comment
создайте воспроизводимый пример и откройте здесь проблему github.com/reactor/reactor-netty - person Violeta Georgieva; 20.05.2020