Потоковый тест Akka с помощью TestKit и ScaleTest

Я получаю NullPointerException при тестировании своего потокового приложения akka с помощью scalatest и не понимаю, почему ... Я, вероятно, что-то пропустил в Akka Streams, я просто ныряю в это.

Я использую общую структуру кода для масштабирования с scala 2.12.4 и sbt 1.0.3 Это мое приложение

object CdrToMongoReactiveStream extends App {

  implicit val system = ActorSystem("cdr-data-generator")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext=materializer.executionContext
  import RandomCdrJsonProtocol._

  val randomCdrThrottledSource : Source[RandomCdr,NotUsed]= Source
    .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
    .throttle(throughput,1.second,1,ThrottleMode.shaping)
    .named("randomCdrThrottledSource")

  val cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= Flow[RandomCdr]
    .map((cdr: RandomCdr) => cdr.toJson.toString())
    .named("cdrJsonParseFlow")

  val mongodbBulkSink : Sink[String,NotUsed] = Flow[String]
    .map((json: String) => Document.parse(json))
    .map((doc: Document) => new InsertOneModel[Document](doc))
    .grouped(bulkSize)
    .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
      Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
    }
    .to(Sink.ignore)

  val f = randomCdrThrottledSource.via(cdrJsonParseFlow).runWith(mongodbBulkSink)
}

И мой тестовый файл

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {

  import RandomCdrJsonProtocol._

  "randomCdrThrottledSource" should {
    "generate RandomCdr elements only" in {
      val future = CdrToMongoReactiveStream.randomCdrThrottledSource
        // line 30 in the error
        .runWith(Sink.head)(CdrToMongoReactiveStream.materializer)

      val cdr = Await.result(future,10.second)
      cdr shouldBe a [RandomCdr]
    }
  }
  "cdrJsonParseFlow" should {
    "parse RandomCdr to correct json format" in {
      val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)
      val (pub,sub) = TestSource.probe[RandomCdr]
        .via(CdrToMongoReactiveStream.cdrJsonParseFlow)
        .toMat(TestSink.probe[String])(Keep.both)
        .run()

      sub.request(1)
      pub.sendNext(randomCdr)
      sub.expectNext() shouldBe equal(randomCdr.toJson.toString())
    }
  }
}

И сообщение об ошибке

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at CdrToMongoReactiveStreamSpec.$anonfun$new$2(CdrToMongoReactiveStreamSpec.scala:30)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.invokeWithFixture$1(WordSpecLike.scala:1076)
    at org.scalatest.WordSpecLike.$anonfun$runTest$1(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike.runTest$(WordSpecLike.scala:1070)
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.$anonfun$runTests$1(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:410)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike.runTests$(WordSpecLike.scala:1146)
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
    at org.scalatest.Suite.run(Suite.scala:1147)
    at org.scalatest.Suite.run$(Suite.scala:1129)
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike.$anonfun$run$1(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike.run(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike.run$(WordSpecLike.scala:1190)
    at org.scalatest.WordSpec.run(WordSpec.scala:1881)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
    at org.scalatest.tools.Runner$.run(Runner.scala:850)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

person vgkowski    schedule 24.11.2017    source источник
comment
Вероятно, одно из значений в строке 30 - ноль. Вы пробовали распечатать их или использовать инструменты отладки IDE, чтобы выяснить, какой именно? Неясно, какая строка была строкой 30 в исходном файле, поскольку вы, кажется, удалили некоторые строки с момента ее запуска. Не могли бы вы добавить строки к своему вопросу или, если они не важны, снова запустить тест без строк, чтобы мы получили полезную трассировку стека?   -  person Brian McCutchon    schedule 25.11.2017
comment
Ошибка не имеет смысла для вашего кода. Исключение нулевого указателя адресовано CdrToMongoReactiveStreamSpec.scala:30, но ваш фрагмент кода занимает всего 25 строк.   -  person Ramón J Romero y Vigil    schedule 25.11.2017
comment
Извините, я упростил код для готовности. Строка 30 - это пробег с (...)   -  person vgkowski    schedule 26.11.2017


Ответы (1)


Решил проблему с объявлением Source, Flow и Sink вне основной программы

object CdrToMongoReactiveStream {

  def randomCdrThrottledSource(msisdnLength : Int,timeRange : Int, throughput : Int): Source[RandomCdr,NotUsed]= {
    Source
      .fromIterator(() => Iterator.continually(RandomCdr(msisdnLength,timeRange)))
      .throttle(throughput,1.second,1,ThrottleMode.shaping)
      .named("randomCdrThrottledSource")
  }

  def cdrJsonParseFlow : Flow[RandomCdr,String,NotUsed]= {
    import RandomCdrJsonProtocol._

    Flow[RandomCdr]
      .map((cdr: RandomCdr) => cdr.toJson.toString())
      .named("cdrJsonParseFlow")
  }

  def mongodbBulkSink(collection : MongoCollection[Document], bulkSize : Int) : Sink[String,NotUsed] = {

    Flow[String]
      .map((json: String) => Document.parse(json))
      .map((doc: Document) => new InsertOneModel[Document](doc))
      .grouped(bulkSize)
      .flatMapConcat { (docs: Seq[InsertOneModel[Document]]) ⇒
        Source.fromPublisher(collection.bulkWrite(docs.toList.asJava))
      }
      .to(Sink.ignore)
  }

  def main(args: Array[String]): Unit = {
    val f = randomCdrThrottledSource(msisdnLength,timeRange,throughput)
      .via(cdrJsonParseFlow).runWith(mongodbBulkSink(collection,bulkSize))

    logger.info("Generated random data")
  }
}

и тестовый файл

class CdrToMongoReactiveStreamSpec extends WordSpec with Matchers {

  import CdrToMongoReactiveStream._
  import RandomCdrJsonProtocol._

  implicit val system = ActorSystem("cdr-data-generator")
  implicit val materializer = ActorMaterializer()

  val collection = new Fongo("mongo test server").getDB("cdrDB").getCollection("cdr")
  val randomCdr = RandomCdr("+33612345678",1511448336402L,"+33612345678","SMS","OUT",0,0,0)

  "randomCdrThrottledSource" should {
    "generate RandomCdr elements only" in {
      val future = CdrToMongoReactiveStream.randomCdrThrottledSource(8,86400000,1)
        .runWith(Sink.head)

      val cdr = Await.result(future,5.second)
      cdr shouldBe a [RandomCdr]
    }
  }
}
person vgkowski    schedule 27.11.2017