Преобразование данных Slick Streaming и отправка фрагментированного ответа с помощью Akka Http

Цель состоит в том, чтобы выполнить потоковую передачу данных из базы данных, выполнить некоторые вычисления для этого фрагмента данных (это вычисление возвращает Future некоторого класса case) и отправить эти данные в виде ответа по фрагментам пользователю. В настоящее время я могу передавать данные и отправлять ответ без каких-либо вычислений. Однако я не могу выполнить это вычисление, а затем передать результат в потоковом режиме.

Это путь, который я реализовал.

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}

Функция getRds возвращает строки таблицы, сопоставленные с классом case (с помощью slick). Теперь рассмотрим функцию compute, которая принимает каждую строку в качестве входных данных и возвращает Future другого класса case. Что-то типа

def compute(x: Tweet) : Future[TweetNew] = ?

Как я могу реализовать эту функцию для переменной src и отправить фрагментированный ответ (в виде потока) этого вычисления пользователю.


person user3294786    schedule 27.12.2017    source источник


Ответы (2)


Вы можете преобразовать источник, используя _ 1_:

val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)

При необходимости отрегулируйте уровень параллельности.


Обратите внимание, что вам может потребоваться настроить несколько параметров, как указано в документации Slick < / а>:

Примечание. Для некоторых систем баз данных может потребоваться определенная настройка параметров сеанса для поддержки потоковой передачи без одновременного кэширования всех данных в памяти на стороне клиента. Например, PostgreSQL требует как .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (с желаемым размером страницы n), так и .transactionally для правильной потоковой передачи.

Так, если вы, например, используете PostgreSQL, ваш Source может выглядеть примерно так:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)
person Jeffrey Chung    schedule 27.12.2017
comment
Это не работает. Я запускаю команду curl, чтобы попасть в конечную точку. Однако соединение закрывается. - person user3294786; 27.12.2017
comment
@ user3294786 звучит так, будто он неправильно ожидает данных перед закрытием - person stan; 20.03.2018
comment
@StanislavPalatnik, вероятно, что-то здесь обнаружил; Я предлагаю добавить этап .log (), чтобы увидеть, что элементы действительно отправляются, как ожидалось (а не только завершение), не забудьте также установить akka.loglevel = DEBUG - person Konrad 'ktoso' Malawski; 22.03.2018
comment
@ user3294786: Вы отметили это как ответ. Так это сработало для вас сейчас? - person stan; 29.03.2018
comment
@StanislavPalatnik Да, это сработало. Я попробовал это на другом наборе данных, и он отлично сработал. Необходимо отладить причину сбоя ранее. - person user3294786; 30.03.2018

У вас должен быть способ маршалировать TweetNew, а также, если вы отправляете кусок с длиной 0, клиент может закрыть соединение.

Этот код работает с curl:

case class TweetNew(str: String)

def compute(string: String) : Future[TweetNew] = Future {
  TweetNew(string)
}

val route = path("hello") {
  get {
    val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
      .mapAsync(2)(compute)
      .map(tweet => ByteString(tweet.str + "\n"))
    complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
  }
}
person Chetan Kumar Meena    schedule 21.03.2018
comment
Если вы отправляете кусок с длиной 0, клиент может закрыть соединение. - Я считаю, что Akka достаточно умен, чтобы отфильтровать их из потока за вас, поскольку они не могут быть представлены в виде фрагментов HTTP. - person Rich; 27.03.2018