Akka HTTP с использованием демаршаллера ответа

Я создаю конвейер данных, используя потоки Akka и Akka HTTP. Вариант использования довольно прост: получить веб-запрос от пользователя, который сделает две вещи. Сначала создайте сеанс, вызвав сторонний API, во-вторых, зафиксируйте этот сеанс в каком-либо постоянном хранилище, когда мы получим сеанс, он затем проксирует исходный запрос пользователя, но добавит данные сеанса.

Я начал работать над первой ветвью конвейера данных, которая является обработкой сеанса, но мне интересно, есть ли более элегантный способ декомпозиции ответа HTTP от стороннего API к POJO, который в настоящее время я использую Jackson.unmarshaller.unmarshal который возвращает CompletionStage<T>, который я затем должен развернуть в T. Это не очень элегантно, и я предполагаю, что у Akka HTTP есть более умные способы сделать это.

Вот код, который у меня есть прямо сейчас

private final Source<Session, NotUsed> session =
        Source.fromCompletionStage(
                getHttp().singleRequest(getSessionRequest(), getMat())).
                map(r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
                map(f -> f.toCompletableFuture().get()).
                alsoTo(storeSession);

person Overly Excessive    schedule 10.05.2017    source источник


Ответы (1)


Akka Streams предлагает вам mapAsync этап для обработки асинхронных вычислений в вашем конвейере настраиваемым и неблокирующим способом.

Ваш код должен стать чем-то вроде

Source.fromCompletionStage(
                getHttp().singleRequest(getSessionRequest(), getMat())).
                mapAsync(4, r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
                alsoTo(storeSession);

Обратите внимание, что:

  1. в данном случае это не просто вопрос элегантности, поскольку CompletableFuture.get является блокирующим вызовом. Это может вызвать ужасные проблемы в вашем конвейере.
  2. параметр Int, необходимый для mapAsync (параллелизм), позволяет точно настроить количество параллельных асинхронных операций, которые могут выполняться одновременно.

Дополнительную информацию в mapAsync можно найти в документы.

person Stefano Bonetti    schedule 10.05.2017
comment
Это очень здорово, я надеялся, что Akka HTTP будет использовать что-то еще более высокого уровня, что-то вроде Flow<HttpResponse, T, ?›`, которое я мог бы подключить к своему конвейеру, и это было бы еще лучше. - person Overly Excessive; 10.05.2017
comment
Что касается блокировки звонков, неужели в этом случае все будет так плохо? У меня сложилось впечатление, что фактический конвейер все равно будет выполняться в потоке актера, поэтому он не будет блокировать работающий поток. - person Overly Excessive; 10.05.2017
comment
Проблема в том, что по умолчанию нет такого понятия, как поток актора. Блокировка внутри актора истощит всю инфраструктуру потоков, совместно используемую вашей системой акторов. Всякий раз, когда это неизбежно (например, при работе с блокирующим вводом-выводом), вы можете настроить этап конвейера на использование отдельного пула потоков. В данном случае в этом нет необходимости. Подробнее здесь doc.akka.io/docs/akka /текущий/общий/ . - person Stefano Bonetti; 10.05.2017