Spark UnionRDD нельзя преобразовать в HasOffsetRanges

Поскольку я получаю сообщения из нескольких разных тем kafka. Поэтому мне нужно использовать метод StreamingContext.union для объединения потоков. Но у меня есть некоторые проблемы при попытке обновить смещения kafka для Zoopkeeper.

ошибка следующая:

java.lang.ClassCastException: org.apache.spark.rdd.UnionRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at com.qingqing.spark.util.KafkaManager.updateZKOffsets(KafkaManager.scala:75)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:43)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Мой код выглядит следующим образом: введите здесь описание изображения

введите здесь описание изображения

Может ли кто-нибудь помочь мне в поиске проблемы. заранее спасибо


person Dialong    schedule 24.06.2017    source источник


Ответы (1)


Это не совсем то, как вы работаете с API прямой потоковой передачи для Kafka и Spark Streaming. Поскольку вы используете прямой подход, здесь нет приемника, поэтому нет необходимости в нескольких потоках, а только в одном потоке, который использует все темы. Вам нужно извлечь смещения для всех тем, которые вы хотите использовать, а затем создать один экземпляр DirectKafkaInputDStream, созданный createDirectStream. Поскольку у меня нет кода, грубый набросок будет выглядеть так:

val offsets: Map[TopicAndPartition, Long] = 
  topics.map { /* create the (TopicAndPartition, Long) tuple here for each topic */ }

val kafkaInputStream = 
  KafkaUtils.createDirectStream(ssc, kafkaParams, offsets, (mmd) => (mmd.key, mmd.value))

Для тем, в которых не сохранены смещения, просто начните со смещения 0.

Что касается HasOffsetRanges, его необходимо сопоставить непосредственно после создания преобразования потока kafka, и только тогда базовый RDD фактически реализует этот признак. Вам нужно немедленно transform над потоком:

val streamAfterTransform = kafkaInputStream.transform { rdd =>
  val ranges = rdd.asInstanceOf[HasOffsetRanges]
  // To stuff with ranges
}
person Yuval Itzchakov    schedule 24.06.2017