Набор данных Spark и java.sql.Date

Допустим, у меня есть Spark Dataset вот так:

scala> import java.sql.Date
scala> case class Event(id: Int, date: Date, name: String)
scala> val ds = Seq(Event(1, Date.valueOf("2016-08-01"), "ev1"), Event(2, Date.valueOf("2018-08-02"), "ev2")).toDS

Я хочу создать новый Dataset только с полями имени и даты. Насколько я понимаю, я могу либо использовать ds.select() с TypedColumn, либо использовать ds.select() с Column, а затем преобразовать DataFrame в Dataset.

Однако я не могу заставить первый вариант работать с типом Date. Например:

scala> ds.select($"name".as[String], $"date".as[Date])
<console>:31: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
              ds.select($"name".as[String], $"date".as[Date])
                                                      ^

Более поздний вариант работает:

scala> ds.select($"name", $"date").as[(String, Date)]
res2: org.apache.spark.sql.Dataset[(String, java.sql.Date)] = [name: string, date: date]

Есть ли способ выбрать Date полей из Dataset, не переходя к DataFrame и обратно?


person Lukáš Lalinský    schedule 05.08.2016    source источник


Ответы (1)


Целый день бился головой о подобных проблемах. Я думаю, вы можете решить свою проблему одной строкой:

implicit val e: Encoder[(String, Date)] = org.apache.spark.sql.Encoders.kryo[(String,Date)]

По крайней мере, у меня это работает.

РЕДАКТИРОВАТЬ

В этих случаях проблема заключается в том, что для большинства Dataset операций Spark 2 требует Encoder, в котором хранится информация о схеме (предположительно для оптимизации). Информация о схеме принимает форму неявного параметра (и несколько Dataset операций имеют такой неявный параметр).

В этом случае OP нашел правильную схему для java.sql.Date, поэтому работает следующее:

implicit val e = org.apache.spark.sql.Encoders.DATE
person Alec    schedule 05.08.2016
comment
Это не решило проблему напрямую, но привело меня на верный путь. Использование implicit val encodeDate = org.apache.spark.sql.Encoders.DATE решает проблему. Я не уверен, почему это не обрабатывается имплицитами по умолчанию. - person Lukáš Lalinský; 06.08.2016