У меня есть функция под названием createTimeLineDS, которая принимает другую функцию в качестве входных данных и помещает эту функцию во внутренний метод карты набора данных. createTimeLineDS только применяет трейты к сигнатуре типа входной функции, в то время как Map требует, чтобы функция возвращала что-то вроде Trait Encoder.
По какой-то причине, когда я помещаю в эту функцию функцию, возвращающую класс case, она выдает ошибку:
Unable to find encoder for type TIMELINE. An implicit Encoder[TIMELINE] is needed to store TIMELINE instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] .map({ case ((event, team), user) =>
convertEventToTimeLineFunction(event, team, user)})
Код ниже, у меня определены все черты и классы case. Что-то не так, это последняя функция, и ее вызов вызывает ошибку, указанную выше. У меня есть import sparkSession.implicits._, поэтому я не уверен, как это сделать правильно.
черты характера, классы случаев и функция, которая используется в качестве параметра:
trait Event {
val teamId: String
val actorId: String
}
trait TimeLine {
val teamDomain: Option[String]
val teamName: Option[String]
val teamIsTest: Option[Boolean]
val actorEmail: Option[String]
val actorName: Option[String]
}
case class JobEventTimeline(
jobId: String,
jobType: Option[String],
inPlanning: Option[Boolean],
teamId: String,
actorId: String,
adminActorId: Option[String],
sessionId: String,
clientSessionId: Option[String],
clientCreatedAt: Long,
seqId: Long,
isSideEffect: Option[Boolean],
opAction: String,
stepId: Option[String],
jobBaseStepId: Option[String],
fieldId: Option[String],
serverReceivedAt: Option[Long],
// "Enriched" data. Data is pulled in from other sources during stream processing
teamDomain: Option[String] = None,
teamName: Option[String] = None,
teamIsTest: Option[Boolean] = None,
actorEmail: Option[String] = None,
actorName: Option[String] = None
) extends TimeLine
def createJobEventTimeLine(jobEvent: CaseClassJobEvent, team: Team, user: User): JobEventTimeline = {
JobEventTimeline(
jobEvent.jobId,
jobEvent.jobType,
jobEvent.inPlanning,
jobEvent.teamId,
jobEvent.actorId,
jobEvent.adminActorId,
jobEvent.sessionId,
jobEvent.clientSessionId,
jobEvent.clientCreatedAt,
jobEvent.seqId,
jobEvent.isSideEffect,
jobEvent.opAction,
jobEvent.stepId,
jobEvent.jobBaseStepId,
jobEvent.fieldId,
jobEvent.serverReceivedAt,
Some(team.domain),
Some(team.name),
Some(team.is_test),
Some(user.email),
Some(user.name)
)
}
Проблемная функция и вызов функции:
def createTimeLineDS[EVENT <: Event with Serializable, TIMELINE <: TimeLine]
(convertEventToTimeLineFunction: (EVENT, Team, User) => TIMELINE)
(sparkSession: SparkSession)
(jobEventDS: Dataset[EVENT]): Dataset[TIMELINE] = {
import sparkSession.implicits._
val teamDS = FuncUtils.createDSFromPostgresql[Team](sparkSession)
val userDS = FuncUtils.createDSFromPostgresql[User](sparkSession)
jobEventDS
.joinWith(teamDS, jobEventDS("teamId") === teamDS("id"), "left_outer")
.joinWith(userDS, $"_1.actorId" === userDS("id"), "left_outer")
.map({ case ((event, team), user) => convertEventToTimeLineFunction(event, team, user)})
Вызов функции:
val jobEventTimeLine = FuncUtils.createTimeLineDS(JobEventTimeline.createJobEventTimeLine)(sparkSession)(jobEventDS)