Попытка запустить SparkSQL через Spark Streaming

Я пытаюсь запускать SQL-запросы по потоковым данным в искре. Это выглядит довольно просто, но когда я пытаюсь это сделать, я получаю сообщение об ошибке table not found : tablename>. Он не может найти таблицу, которую я зарегистрировал.

Использование Spark SQL с пакетными данными работает нормально, поэтому я думаю, что это связано с тем, как я вызываю streamingcontext.start(). Есть идеи, в чем проблема? Вот код:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

object Streaming {

def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
    // lines.foreachRDD(rdd=>rdd.foreach(println))
    val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
    ssc.start()
    ssc.awaitTermination()
  }
}

Любые предложения приветствуются. Спасибо.


person Pravesh Jain    schedule 21.08.2014    source источник


Ответы (1)


Ну, я узнал проблему. Вы должны запросить данные в функции foreachRDD, иначе таблица не будет распознана. Что-то вроде этого работает:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object Mlist {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(2))

    val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val sqc = new SQLContext(sc);
    import sqc.createSchemaRDD

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created

    lines.foreachRDD(rdd=>{
      rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
      teenagers.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
person Pravesh Jain    schedule 21.08.2014
comment
хороший розыгрыш! Интересно, применимо ли что-то подобное к структурированной потоковой передаче. У меня возникли проблемы с тем, чтобы заставить sql работать с ним последовательно / понятным образом. - person WestCoastProjects; 11.07.2019