Я пытаюсь запускать SQL-запросы по потоковым данным в искре. Это выглядит довольно просто, но когда я пытаюсь это сделать, я получаю сообщение об ошибке table not found : tablename>. Он не может найти таблицу, которую я зарегистрировал.
Использование Spark SQL с пакетными данными работает нормально, поэтому я думаю, что это связано с тем, как я вызываю streamingcontext.start(). Есть идеи, в чем проблема? Вот код:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
object Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
// lines.foreachRDD(rdd=>rdd.foreach(println))
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
ssc.start()
ssc.awaitTermination()
}
}
Любые предложения приветствуются. Спасибо.