Как создать Source и вставлять в него элементы вручную?

Я хочу создать пользовательскую StatefulStage, который должен работать как метод groupBy и испускать элементы Source[A, Unit], но я не понимаю, как создать экземпляр Source[A, Unit] и передать ему входящий элемент. Вот заглушка:

class GroupBy[A, Mat]() extends StatefulStage[A, Source[A, Unit]] {
  override def initial: StageState[A, Source[A, Unit]] = new StageState[A, Source[A, Unit]] {
    override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
      val source: Source[A, Unit] = ... // Need to create source here

      // and push `elem` to `source` here

      emit(List(source).iterator, ctx)
    }
  }
}

Вы можете использовать следующий фрагмент для тестового потока GroupBy (он должен печатать события из созданного потока):

case class Tick()
case class Event(timestamp: Long, sessionUid: String, traffic: Int)

implicit val system = ActorSystem()
import system.dispatcher

implicit val materializer = ActorMaterializer()

var rnd = Random
rnd.setSeed(1)

val eventsSource = Source
  .tick(FiniteDuration(0, SECONDS), FiniteDuration(1, SECONDS), () => Tick)
  .map {
    case _ => Event(System.currentTimeMillis / 1000, s"session-${rnd.nextInt(5)}", rnd.nextInt(10) * 10)
  }

val flow = Flow[Event]
  .transform(() => new GroupByUntil)
  .map {
    (source) => source.runForeach(println)
  }

eventsSource
  .via(flow)
  .runWith(Sink.ignore)
  .onComplete(_ => system.shutdown())

Кто-нибудь может объяснить мне, как это сделать?

ОБНОВИТЬ:

Я написал следующий метод onPush на основе этого ответа, но он не печатал события. Насколько я понимаю, я могу отправить элемент в источник только тогда, когда он работает как часть потока, но я хочу запустить поток за пределами GroupBy в тестовом фрагменте. Если я запущу поток в GroupBy, как в этом примере, он будет обрабатывать события и отправлять их в Sink.ignore. Я думаю, что это причина, по которой мой тестовый фрагмент не печатал события.

override def onPush(elem: A, ctx: Context[Source[A, Unit]]): SyncDirective = {
  val source: Source[A, ActorRef] = Source.actorRef[A](1000, OverflowStrategy.fail)
  val flow = Flow[A].to(Sink.ignore).runWith(source)

  flow ! elem

  emit(List(source.asInstanceOf[Source[A, Unit]]).iterator, ctx)
}

Итак, как это исправить?


person Maxim    schedule 24.11.2015    source источник
comment
Вот ответ на этот вопрос от команды Akka.   -  person Maxim    schedule 26.11.2015