Spark rdd правильный формат даты в scala?

Это значение даты, которое я хочу использовать при преобразовании RDD в Dataframe.

Sun Jul 31 10:21:53 PDT 2016

Эта схема DataTypes.DateType вызывает ошибку.

java.util.Date is not a valid external type for schema of date

Поэтому я хочу заранее подготовить RDD таким образом, чтобы вышеуказанная схема могла работать. Как я могу исправить формат даты, чтобы он работал при преобразовании в фрейм данных?

//Schema for data frame
val schema =
                StructType(
                    StructField("lotStartDate", DateType, false) ::
                    StructField("pm", StringType, false) ::
                    StructField("wc", LongType, false) ::
                    StructField("ri", StringType, false) :: Nil)

// rowrdd : [Sun Jul 31 10:21:53 PDT 2016,"PM",11,"ABC"]
val df = spark.createDataFrame(rddRow,schema)

person Interstellar    schedule 26.01.2018    source источник
comment
Можете ли вы показать, как вы пытаетесь преобразовать RDD в DataFrame? Был бы полезен минимальный, полный и проверяемый пример (код, который создает образец RDD, преобразует его в DF и выдает ошибку вы сообщили).   -  person Tzach Zohar    schedule 27.01.2018


Ответы (1)


DateType Spark можно закодировать из java.sql.Date, поэтому вам следует преобразовать входной RDD для использования этого типа, например:

val inputRdd: RDD[(Int, java.util.Date)] = ??? // however it's created

// convert java.util.Date to java.sql.Date:
val fixedRdd = inputRdd.map {
  case (id, date) => (id, new java.sql.Date(date.getTime))
}

// now you can convert to DataFrame given your schema:
val schema = StructType(
  StructField("id", IntegerType) :: 
  StructField("date", DateType) :: 
  Nil
)

val df = spark.createDataFrame(
  fixedRdd.map(record => Row.fromSeq(record.productIterator.toSeq)),
  schema
)

// or, even easier - let Spark figure out the schema:
val df2 = fixedRdd.toDF("id", "date")

// both will evaluate to the same schema, in this case
person Tzach Zohar    schedule 26.01.2018
comment
Работал!! Большое спасибо!! - person Interstellar; 28.01.2018
comment
java.lang.IndexOutOfBoundsException Error возникает при преобразовании RDD в Dataframe обоими способами. Я также очищаю RDD, например val rddFinal = rddFlat.filter(x => (x._1 != null && x._1.isInstanceOf[Date] && x._2 != null && x._3 != null && x._4 != null && x._2.length>0 && x._4.length>0)). Ссылка: [stackoverflow.com/questions/48416695/ Это работало без даты в RDD. Но на сегодняшний день это не решает проблемы. - person Interstellar; 28.01.2018