Передача вывода потока в вещание в Akka Streams Graph

Я пытаюсь написать график Akka Stream. Код, который я написал,

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
   (sink1, sink2) =>
      import GraphDSL.Implicits._
      val bcast = builder.add(Broadcast[Row](2))
      val flow = source ~> flow1 ~> flow2
      flow.out ~> bcast.in
      bcast.out(0) ~> sink1
      bcast.out(1) ~> flow3 ~> flow4 ~> sink2
      ClosedShape
})

val (f1, f2) = graph.run()
val consolidated = Future.sequence(List(f1, f2))
Await.result(consolidated, Duration.Inf)

Этот код не компилируется, потому что я не могу соединить выход из потока с входом bcast.

Я могу соединить выход источника с входом bcast, но я не могу этого сделать, потому что некоторые части являются общими между двумя ветвями. Поэтому я должен создать ветку в графе только после потока2

Кроме того... Я не уверен, правильно ли я пишу график, потому что он возвращает два фьючерса Done, и мне нужно объединить их в одно будущее вручную, используя Sequence.


person Knows Not Much    schedule 30.08.2017    source источник


Ответы (1)


Вы не можете связать свой график в 2 шага, так как комбинатор ~> не возвращает вам поток. На самом деле это декларативная операция с сохранением состояния.

Лучшим подходом здесь было бы связать ваш график за один раз, например.

  source ~> flow1 ~> flow2 ~> bcast
                              bcast          ~>          sink1
                              bcast ~> flow3 ~> flow4 ~> sink2

или, в качестве альтернативы, вы можете разделить объявления, добавив этап в конструктор (и получив его форму), например.

  val flow2s = builder.add(flow2)

  source ~> flow1 ~> flow2s.in
  flow2s.out ~> bcast
                bcast          ~>          sink1
                bcast ~> flow3 ~> flow4 ~> sink2

Что касается материализованных Future, вам нужно выбрать то, что имеет смысл в качестве материализованного значения вашего графика в целом. Если вам нужен только один из двух материализованных Future Sink, вам нужно передать только его методу GraphDSL.create. В противном случае, если вас интересуют оба Future, имеет смысл sequence или zip их вместе.

person Stefano Bonetti    schedule 30.08.2017