Как сохранить прямой поток Kafka JSON в Cassandra?

Мне нужно сохранить потоковые данные искры в Cassandra. Поток исходит от Kafka, а сообщение Kafka имеет формат JSON, как показано ниже.

{
  "status": "NOT_AVAILABLE",
  "itemid": "550672332",
  "qty": 0,
  "lmts": "2017-11-18T10:39:21-08:00",
  "timestamp": 1511030361000
}

Я написал ниже код, чтобы сделать это в Spark 2.2.0.

case class NliEvents(itemid: String, status: String, qty: String)

def main(args: Array[String]): Unit = {
 .....
  val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )


    val valueStream = stream.map(_.value())
    val cassandraCrud = new CassandraOperations
    import com.datastax.spark.connector._

    val columns = SomeColumns("itemid", "status", "qty")
    val keySpace = configuration.getString(env + ".cassandra.keyspace")
    val gson = new Gson()
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats
    valueStream.foreachRDD((rdd, time) => {
      if (!rdd.isEmpty()) {
        val mapped = rdd.map(records => {
          val json = parse(records)
          val events = json.extract[NliEvents]
          events
        }
        )
        mapped.saveToCassandra(keySpace, "nli_events", columns)
      }
    })
}

Когда я запускаю этот код, я получаю

java.io.NotSerializableException: org.json4s.DefaultFormats$

ошибка. Может не правильно делаю.


person Rishi Saraf    schedule 20.11.2017    source источник


Ответы (1)


можете ли вы заменить свой оператор foreach следующим кодом.

valueStream.mapPartitions(x => {
  val lst = scala.collection.mutable.ListBuffer[NliEvents]()
  while (x.hasNext) {
    val json = parse(x.next())
    val events = json.extract[NliEvents]
    lst += events

  }
  lst.toList.iterator
  }
).saveToCassandra(keySpace, "nli_events",columns)

Он должен работать. Дайте мне знать, если вы получите какие-либо ошибки.

person vindev    schedule 20.11.2017