Поточно предаване на данни от Play Enumerator през Spray, използвайки Chunked Responses

Имам данни, изтеглени от Reactive Mongo, които трябва да прокарам през API на Spray Rest. Надявах се да направя това с Chunked Responses. Въпреки това открих, че Enumerator, който се връща от Reactive Mongo, е в състояние да премине през Spray по-бързо, отколкото може да се справи с мрежовата връзка. Резултатът е, че връзката е прекратена.

Успях да разреша този проблем с помощта на функцията Spray Ack в междинен актьор. Това заедно с Blocking Await ми позволи да създам обратен натиск върху Enumerator. Аз обаче не искам наистина Await. Бих искал да разбера начин за поточно предаване на данните през Spray по неблокиращ начин.

Възможно ли е това? Имам няколко идеи, които биха могли да проработят, ако мога да попълня липсващите части.

1) Създайте обратен натиск върху Enumerator по неблокиращ начин (нямате идея как да направите това. Предложения?)

2) Разделете изброителя на по-малки изброители. Започнете да консумирате всеки изброител само след като предишният е завършен. Мога да направя това с помощта на актьор. Това, което ми липсва тук, е начин да разделя по-големия изброител на по-малки изброители.

3) Използвайте нещо като метода "Enumeratee.take". Където бих взел известен брой записи от Преброителя, след което, когато съм готов, ще взема още. Това наистина е същото решение като 2), но от малко по-различна гледна точка. Това обаче ще изисква преброителят да поддържа състояние. Има ли начин да използвате Enumeratee.take няколко пъти срещу един и същи изброител, без да рестартирате отначало всеки път?

Може ли някой да предложи някакви алтернативни предложения, които биха могли да работят? Или ако не е възможно, моля, уведомете ме.

Използвам Play Enumerators 2.3.5


person Wade    schedule 27.10.2014    source източник


Отговори (2)


Мисля, че идеята е да имплементирате Iteratee, чийто fold метод извиква предоставеното обратно извикване само след получаване на Spray Ack. Нещо като:

def handleData(input: Input[String]) = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
    (sprayActor ? input).flatMap {
      case success => folder(Cont(handleData))
      case error => folder(Error(...))
      case done => ...
    }
  }
}

val initialIteratee = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}

enumerator.run(initialIteratee)

Това трябва да е неблокиращо, но гарантира, че следващото парче се изпраща само след като предишното парче е успешно.

person lmm    schedule 28.10.2014
comment
Съгласен съм, това изглежда обещаващо и преследвах точно тази идея, преди да прочета публикацията ви. Въпреки това никога не можах да разбера синтаксиса. Къде се извиква handleData във вашия пример? Къде да вземем тази първоначална входна стойност? Не съм сигурен в Play 2.3.5 как да приложа това. - person Wade; 28.10.2014
comment
Мисля, че може би имате нужда от отделен първоначален итератор; Добавих един към кода. - person lmm; 28.10.2014
comment
Да, вярвам, че вече съм го решил. Публикацията ви определено помогна. Трябваше да добавя първоначален iteratee и да променя няколко други неща, за да заработи, но мисля, че вече го имам. Благодаря ви за помощта. - person Wade; 28.10.2014
comment
Ще има ли проблем с това, че методът handleData е рекурсивен? Т.Е. ако имаше много данни, това в крайна сметка щеше ли да доведе до препълване на стека? Или има някаква оптимизация под капака, за да се предотврати това? - person Wade; 29.10.2014
comment
Бих очаквал Iteratee да направи нещо умно с Future[B]s, но не знам; Мога само да предложа да го тествате. - person lmm; 29.10.2014

След доста експерименти (и помощ от stackoverflow) успях да разбера решение, което изглежда работи. Той използва Spray Chunked Responses и изгражда итератор около това.

Съответните кодови фрагменти са включени тук:

ChunkedResponder.scala

package chunkedresponses

import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._

object ChunkedResponder {
  case class Chunk(data: HttpData)
  case object Shutdown
  case object Ack
}

class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
  import ChunkedResponder._
  def receive:Receive = {
    case chunk: Chunk =>
      responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
      context.become(chunking)
    case Shutdown =>
      responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
      context.stop(self)
  }

  def chunking:Receive = {
    case chunk: Chunk =>
      responder.forward(MessageChunk(chunk.data).withAck(Ack))
    case Shutdown =>
      responder.forward(ChunkedMessageEnd().withAck(Ack))
      context.stop(self)
  }
}

ChunkIteratee.scala

package chunkedresponses

import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}

class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
  import ChunkedResponder._
  private implicit val timeout = Timeout(30.seconds)

  def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
    def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))

    def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
      case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
      case Input.Empty => waitForAck(Future.successful(Unit))
      case Input.EOF =>
        chunkedResponder ! Shutdown
        Done(Unit, Input.EOF)
    }

    folder(Step.Cont(step))
  }
}

пакет.scala

import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext

import scala.concurrent.ExecutionContext

package object chunkedresponses {
  implicit class ChunkedRequestContext(requestContext: RequestContext) {
    def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
                       (implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
      val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
      val iteratee = new ChunkIteratee(chunkedResponder)
      enumerator.run(iteratee)
    }
  }
}
person Wade    schedule 04.12.2014
comment
моля, поставете важната част от отговора си в SO, вместо да правите връзка към вашия блог (който вече не съществува) - person alvi; 28.11.2016
comment
Актуализиран, за да включва кодовите фрагменти от оригиналната публикация в блога, която вече е мъртва. - person Wade; 21.02.2017