Я хотел бы реализовать поток для обработки результатов с разбивкой на страницы (например, базовая служба возвращает некоторые результаты, но также указывает, что доступны дополнительные результаты, делая другой запрос, передавая, например, курсор).
Что я сделал до сих пор:
Я реализовал следующий поток и тест, но поток не завершается.
object AdditionalRequestsFlow { private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = builder.add(Flow[Request]) val bcast = builder.add(Broadcast[Request](2)) val merge = builder.add(Zip[Request, Response]()) in ~> bcast ~> merge.in0 bcast ~> flow ~> merge.in1 FlowShape(in.in, merge.out) }) } def flow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val start = b.add(Flow[Request]) val merge = b.add(Merge[Request](2)) val underlying = b.add(keepRequest(inputFlow)) val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList)) val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) => (res, anotherRequest(req, res)) }) val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work start ~> merge ~> underlying ~> unzip.in unzip.out0 ~> finish merge <~ unOption <~ unzip.out1 FlowShape(start.in, finish.out) }) } }
Тест:
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} import org.scalatest.FlatSpec import org.scalatest.Matchers._ import cats.syntax.option._ import org.scalatest.concurrent.ScalaFutures.whenReady class AdditionalRequestsFlowSpec extends FlatSpec { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() case class Request(max: Int, batchSize: Int, offset: Option[Int] = None) case class Response(values: List[Int], nextOffset: Option[Int]) private val flow: Flow[Request, Response, NotUsed] = { Flow[Request] .map { request => val start = request.offset.getOrElse(0) val end = Math.min(request.max, start + request.batchSize) val nextOffset = if (end == request.max) None else Some(end) val result = Response((start until end).toList, nextOffset) result } } "AdditionalRequestsFlow" should "collect additional responses" in { def anotherRequest(request: Request, response: Response): Option[Request] = { response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) } } def extract(x: Response): List[Int] = x.values def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b val requests = Request(max = 35, batchSize = 10) :: Request(max = 5, batchSize = 10) :: Request(max = 100, batchSize = 1) :: Nil val expected = requests.map { x => (0 until x.max).toList } val future = Source(requests) .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge)) .runWith(Sink.seq) whenReady(future) { x => x shouldEqual expected } } }
Реализовал тот же поток ужасным, блокирующим образом, чтобы проиллюстрировать, чего я пытаюсь достичь:
def uglyHackFlow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() Flow[Request] .map { x => def grab(request: Request): Output = { val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :( val another = anotherRequest(request, response) val output = extractOutput(response) another.map { another => mergeOutput(output, grab(another)) } getOrElse output } grab(x) } }
Это работает (но мы не должны материализовать что-либо /
Await
на данный момент).Проверено http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks, который, как мне кажется, содержит ответ, однако я не могу найти его там. В моем случае я бы ожидал, что цикл должен содержать один элемент в большинстве случаев, поэтому не должно происходить ни переполнения буфера, ни полного голодания, но, очевидно, это происходит.
Пытался отладить поток с помощью
.withAttributes(Attributes(LogLevels(...)))
, однако это не приводит к каким-либо результатам, несмотря на, казалось бы, правильно настроенные регистраторы.
Я ищу подсказки, как исправить метод flow
, сохраняя ту же подпись и семантику (тест пройдет).
Или, может быть, я делаю что-то совершенно нестандартное (например, в akka-stream-contrib
уже есть функция, которая решает эту проблему)?
<entry/
›, а затем<link href="..." rel="next"/>
для следующего URL-адреса, который нужно вызвать, чтобы получить следующий ответ с большим количеством записей. В настоящее время я подключил свойuglyHackFlow
, и он отлично работает для этого варианта использования. - person John M   schedule 18.12.2016