Как в scala заставить кодировщик работать с типом, когда он вставлен в общую функцию, которая применяет только определенные черты?

У меня есть функция под названием createTimeLineDS, которая принимает другую функцию в качестве входных данных и помещает эту функцию во внутренний метод карты набора данных. createTimeLineDS только применяет трейты к сигнатуре типа входной функции, в то время как Map требует, чтобы функция возвращала что-то вроде Trait Encoder.

По какой-то причине, когда я помещаю в эту функцию функцию, возвращающую класс case, она выдает ошибку:

    Unable to find encoder for type TIMELINE. An implicit Encoder[TIMELINE] is needed to store TIMELINE instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    [error]       .map({ case ((event, team), user) =>  

convertEventToTimeLineFunction(event, team, user)})

Код ниже, у меня определены все черты и классы case. Что-то не так, это последняя функция, и ее вызов вызывает ошибку, указанную выше. У меня есть import sparkSession.implicits._, поэтому я не уверен, как это сделать правильно.

черты характера, классы случаев и функция, которая используется в качестве параметра:

trait Event {
  val teamId: String
  val actorId: String
}

trait TimeLine {
  val teamDomain: Option[String]
  val teamName: Option[String]
  val teamIsTest: Option[Boolean]
  val actorEmail: Option[String]
  val actorName: Option[String]
}  

case class JobEventTimeline(
                         jobId: String,
                         jobType: Option[String],
                         inPlanning: Option[Boolean],

                         teamId: String,
                         actorId: String,
                         adminActorId: Option[String],
                         sessionId: String,
                         clientSessionId: Option[String],
                         clientCreatedAt: Long,
                         seqId: Long,
                         isSideEffect: Option[Boolean],

                         opAction: String,
                         stepId: Option[String],
                         jobBaseStepId: Option[String],
                         fieldId: Option[String],

                         serverReceivedAt: Option[Long],

                         // "Enriched" data. Data is pulled in from other sources during stream processing
                         teamDomain: Option[String] = None,
                         teamName: Option[String] = None,
                         teamIsTest: Option[Boolean] = None,

                         actorEmail: Option[String] = None,
                         actorName: Option[String] = None
                       ) extends TimeLine


def createJobEventTimeLine(jobEvent: CaseClassJobEvent, team: Team, user: User): JobEventTimeline = {
    JobEventTimeline(
      jobEvent.jobId,
      jobEvent.jobType,
      jobEvent.inPlanning,
      jobEvent.teamId,
      jobEvent.actorId,
      jobEvent.adminActorId,
      jobEvent.sessionId,
      jobEvent.clientSessionId,
      jobEvent.clientCreatedAt,
      jobEvent.seqId,
      jobEvent.isSideEffect,
      jobEvent.opAction,
      jobEvent.stepId,
      jobEvent.jobBaseStepId,
      jobEvent.fieldId,
      jobEvent.serverReceivedAt,
      Some(team.domain),
      Some(team.name),
      Some(team.is_test),
      Some(user.email),
      Some(user.name)
    )
  }

Проблемная функция и вызов функции:

def createTimeLineDS[EVENT <: Event with Serializable, TIMELINE <: TimeLine]

  (convertEventToTimeLineFunction: (EVENT, Team, User) => TIMELINE)
  (sparkSession: SparkSession)
  (jobEventDS: Dataset[EVENT]): Dataset[TIMELINE] = {
    import sparkSession.implicits._
    val teamDS = FuncUtils.createDSFromPostgresql[Team](sparkSession)
    val userDS = FuncUtils.createDSFromPostgresql[User](sparkSession)
    jobEventDS
      .joinWith(teamDS, jobEventDS("teamId") === teamDS("id"), "left_outer")
      .joinWith(userDS, $"_1.actorId" === userDS("id"), "left_outer")
      .map({ case ((event, team), user) =>  convertEventToTimeLineFunction(event, team, user)})

Вызов функции:

val jobEventTimeLine = FuncUtils.createTimeLineDS(JobEventTimeline.createJobEventTimeLine)(sparkSession)(jobEventDS)

person Brian Yeh    schedule 13.08.2019    source источник
comment
@ LuisMiguelMejíaSuárez, ты гений Scala. Однако он работал с sparkSession в качестве параметра. Мой вопрос к вам: почему это работает?   -  person Brian Yeh    schedule 14.08.2019
comment
Я добавил более подробный ответ с парой ссылок и примеров. Надеюсь, поможет!   -  person Luis Miguel Mejía Suárez    schedule 14.08.2019


Ответы (1)


Самым простым решением было бы сделать это вместо этого:

def createTimeLineDS[EVENT <: Event, TIMELINE <: TimeLine : Encoder](...)

Вам, вероятно, не понадобится параметр sparkSession, а также строка import sparkSession.implicits._.
(но вам могут потребоваться дополнительные изменения, продолжайте читать).

Итак, проблема заключается в том, что _ 4_ метод < / strong> в наборе данных требуется неявное Encoder для типа вывода. Итак, что вы делаете с этим забавным синтаксисом (называемым < strong> context bound) означает, что ваш метод также требует такого неявного, поэтому компилятор будет доволен, пока вызывающий ваш метод предоставляет его (обычно через import spark.implicits._ где-то раньше).

Для получения дополнительной информации о имплицитах, где выполняет ли их поиск компилятором & зачем вам кодировщик, прочтите связанные статьи.


Теперь, когда вы все это прочитали, я бы ожидал, в чем проблема и как ее исправить.
Но, вероятно, вам все равно понадобится явное import sparkSession.implicits._ в вашем методе. Вероятно, это потому, что FuncUtils.createDSFromPostgresql[Team](sparkSession) делает то же самое, но теперь вы знаете, как его реорганизовать.

Кроме того, поскольку Team и User являются конкретными классами, которые вы контролируете, вы можете добавить что-то подобное к их сопутствующим объектам, поэтому вам не нужно запрашивать их кодировщики, потому что они всегда будут в неявной области.

object Team {
  // https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoders$@product[T%3C:Product](implicitevidence$5:reflect.runtime.universe.TypeTag[T]):org.apache.spark.sql.Encoder[T]
  implicit final val TeamEncoder: Encoder[Team] = Encoders.product
}
person Luis Miguel Mejía Suárez    schedule 13.08.2019