Я создаю конвейер данных, используя потоки Akka и Akka HTTP. Вариант использования довольно прост: получить веб-запрос от пользователя, который сделает две вещи. Сначала создайте сеанс, вызвав сторонний API, во-вторых, зафиксируйте этот сеанс в каком-либо постоянном хранилище, когда мы получим сеанс, он затем проксирует исходный запрос пользователя, но добавит данные сеанса.
Я начал работать над первой ветвью конвейера данных, которая является обработкой сеанса, но мне интересно, есть ли более элегантный способ декомпозиции ответа HTTP от стороннего API к POJO, который в настоящее время я использую Jackson.unmarshaller.unmarshal
который возвращает CompletionStage<T>
, который я затем должен развернуть в T
. Это не очень элегантно, и я предполагаю, что у Akka HTTP есть более умные способы сделать это.
Вот код, который у меня есть прямо сейчас
private final Source<Session, NotUsed> session =
Source.fromCompletionStage(
getHttp().singleRequest(getSessionRequest(), getMat())).
map(r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
map(f -> f.toCompletableFuture().get()).
alsoTo(storeSession);