Потоковая передача данных из счетчика воспроизведения через спрей с использованием фрагментированных ответов.

У меня есть данные, извлекаемые из Reactive Mongo, которые мне нужно передать через API-интерфейс Spray Rest. Я надеялся сделать это с помощью Chunked Responses. Однако я обнаружил, что Enumerator, который возвращается из Reactive Mongo, способен проталкивать Spray быстрее, чем может обрабатывать сетевое соединение. В результате соединение обрывается.

Мне удалось решить эту проблему с помощью функции Spray Ack в промежуточном действующем субъекте. Это вместе с ожиданием блокировки позволило мне создать обратное давление на перечислитель. Однако я действительно не хочу Await. Я хотел бы найти способ передавать данные через спрей неблокирующим образом.

Это возможно? У меня есть несколько идей, которые могут сработать, если я смогу заполнить недостающие части.

1) Создайте обратное давление на Enumerator неблокирующим образом (не знаю, как это сделать. Предложения?)

2) Разбейте счетчик на более мелкие счетчики. Начинайте использовать каждый счетчик только после завершения предыдущего. Я могу сделать это с помощью актера. Чего мне здесь не хватает, так это способа разбить большой счетчик на более мелкие счетчики.

3) Используйте что-то вроде метода "Enumeratee.take". Где бы я взял какое-то количество записей из Перечислителя, потом, когда буду готов, возьму еще. Это действительно то же самое решение, что и 2), но с немного другой точки зрения. Однако это потребует от счетчика поддерживать состояние. Есть ли способ использовать Enumeratee.take несколько раз против одного и того же счетчика без перезапуска каждый раз с самого начала?

Может ли кто-нибудь предложить какие-либо альтернативные предложения, которые могут работать? Или если это невозможно, пожалуйста, дайте мне знать.

Я использую Play Enumerators 2.3.5.


person Wade    schedule 27.10.2014    source источник


Ответы (2)


Я думаю, что идея заключается в том, что вы реализуете Iteratee, чей метод fold вызывает предоставленный обратный вызов только после получения Spray Ack. Что-то типа:

def handleData(input: Input[String]) = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
    (sprayActor ? input).flatMap {
      case success => folder(Cont(handleData))
      case error => folder(Error(...))
      case done => ...
    }
  }
}

val initialIteratee = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}

enumerator.run(initialIteratee)

Это должно быть неблокирующим, но гарантирует, что следующий фрагмент будет отправлен только после успешного завершения предыдущего фрагмента.

person lmm    schedule 28.10.2014
comment
Я согласен, это кажется многообещающим, и я преследовал именно эту идею, прежде чем читать ваш пост. Однако я никогда не мог понять синтаксис. Где вызывается handleData в вашем примере? Где мы можем получить это начальное значение ввода? Я не уверен в Play 2.3.5, как это реализовать. - person Wade; 28.10.2014
comment
Я думаю, может быть, вам нужна отдельная начальная итерация; Я добавил один в код. - person lmm; 28.10.2014
comment
Да, я считаю, что я решил это сейчас. Ваш пост определенно помог. Мне пришлось добавить начальную итерацию и изменить несколько других вещей, чтобы заставить ее работать, но я думаю, что теперь она у меня есть. Спасибо за вашу помощь. - person Wade; 28.10.2014
comment
Будет ли проблема с рекурсивным методом handleData? т.е. если бы было много данных, это в конечном итоге вызвало бы переполнение стека? Или есть какая-то оптимизация под капотом, чтобы предотвратить это? - person Wade; 29.10.2014
comment
Я бы ожидал, что Iteratee сделает что-нибудь умное с Future[B], но я не знаю; Могу только предложить проверить. - person lmm; 29.10.2014

После изрядного количества экспериментов (и помощи от stackoverflow) я смог найти решение, которое, кажется, работает. Он использует разбивку ответов по частям и строит итерацию вокруг этого.

Соответствующие фрагменты кода включены здесь:

ChunkedResponder.scala

package chunkedresponses

import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._

object ChunkedResponder {
  case class Chunk(data: HttpData)
  case object Shutdown
  case object Ack
}

class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
  import ChunkedResponder._
  def receive:Receive = {
    case chunk: Chunk =>
      responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
      context.become(chunking)
    case Shutdown =>
      responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
      context.stop(self)
  }

  def chunking:Receive = {
    case chunk: Chunk =>
      responder.forward(MessageChunk(chunk.data).withAck(Ack))
    case Shutdown =>
      responder.forward(ChunkedMessageEnd().withAck(Ack))
      context.stop(self)
  }
}

ChunkIteratee.scala

package chunkedresponses

import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}

class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
  import ChunkedResponder._
  private implicit val timeout = Timeout(30.seconds)

  def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
    def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))

    def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
      case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
      case Input.Empty => waitForAck(Future.successful(Unit))
      case Input.EOF =>
        chunkedResponder ! Shutdown
        Done(Unit, Input.EOF)
    }

    folder(Step.Cont(step))
  }
}

пакет .scala

import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext

import scala.concurrent.ExecutionContext

package object chunkedresponses {
  implicit class ChunkedRequestContext(requestContext: RequestContext) {
    def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
                       (implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
      val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
      val iteratee = new ChunkIteratee(chunkedResponder)
      enumerator.run(iteratee)
    }
  }
}
person Wade    schedule 04.12.2014
comment
пожалуйста, поместите важную часть вашего ответа в SO, а не ссылку на свой блог (которого больше не существует) - person alvi; 28.11.2016
comment
Обновлено, чтобы включить фрагменты кода из исходного сообщения в блоге, которое теперь мертво. - person Wade; 21.02.2017