Потоковая передача Spark не запоминает предыдущее состояние

Я написал программу потоковой передачи искры с преобразованием состояния. Похоже, что мое приложение для потоковой передачи искры правильно выполняет вычисления с контрольными точками. Но если я завершу свою программу и запущу ее снова, она не прочитает данные предыдущих контрольных точек и не начнет с самого начала. Это ожидаемое поведение?

Нужно ли что-то менять в моей программе, чтобы она запоминала предыдущие данные и начинала вычисления оттуда?

Заранее спасибо.

Для справки моя программа:

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("hdfs://<hostname1>:8020/user/spark/checkpoints_dir")
    inputStream.print(1)
    val parsedStream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
      })
    import breeze.linalg.{DenseVector => BDV}
    import scala.util.Try

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
      (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
        prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })
    state.checkpoint(Duration(10000))
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }
}

person Vibhuti    schedule 27.02.2016    source источник


Ответы (2)


В соответствии с документацией по потоковой передаче искры вы должны инициализировать контекст немного по-другому:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

см. контрольные точки

person vitalii    schedule 27.02.2016

Как это описано в документации по контрольным точкам вам нужно настроить свой код, чтобы иметь возможность восстанавливать состояние из контрольных точек.

В частности, вы не можете создать StreamingContext напрямую, но должны использовать StreamingContext.getOrCreate, который принимает:

  • каталог контрольных точек
  • функция, которую можно использовать для настройки контекста (Unit => StreamingContext)
person Community    schedule 27.02.2016
comment
Спасибо, zero323. Я смог это сделать. Теперь он запоминает предыдущее состояние. - person Vibhuti; 28.02.2016