Это может быть глупый вопрос, но я боролся в течение довольно долгого времени. Это действительно похоже на этот вопрос, но я не смог его применить в моем коде (дуэт с шаблонами или функцией).
Я хочу передать функцию преобразования flatMap (или карты) в аргумент функции, а затем передать ее функции стратегии, которая фактически вызывает метод df.rdd.flatMap. попробую объяснить!
case class Order(id: String, totalValue: Double, freight: Double)
case class Product(id: String, price: Double)
... or any other case class, whatever one needs to transform a row into ...
Класс сущности:
class Entity(path: String) = {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
this.getStrategy.flatMap[T](mapFunction)
return this
}
def save(path: String): Unit = {
... write logic ...
}
}
Сущность может иметь разные стратегии для своих методов. EntityStrategy выглядит следующим образом:
abstract class EntityStrategy(private val entity: Entity,
private val spark: SparkSession) {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
def map[T](mapFunction: (Row) => T)
}
И один пример реализации EntityStrategy:
class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
extends EntityStrategy(entity, spark) {
...
override def map[T](mapFunction: Row => T): Unit = {
val rdd = this.getData.rdd.map(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
var rdd = this.getData.rdd.flatMap(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
}
Наконец, я хотел бы создать функцию flatMap/map и вызвать ее следующим образом:
def transformFlatMap(row: Row): ArrayBuffer[Order] = {
var orders = new ArrayBuffer[Order]
var _deliveries = row.getAs[Seq[Row]]("deliveries")
_deliveries.foreach(_delivery => {
var order = Order(
id = row.getAs[String]("id"),
totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
orders += order
})
return orders
}
val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")
Конечно, это не работает. Я получаю сообщение об ошибке на SparkEntityStrategy:
Ошибка: (95, 35) ClassTag не доступен для T val rdd = this.getData.rdd.map(f = mapFunction)
Я попытался добавить (implicit encoder: Encoder: T)
как к методам сущности, так и к методам стратегии, но это не помогло. Возможно, я сделал что-то не так, так как я новичок в Scala.
Если я уберу "Т" и передам фактический класс case, все будет работать нормально.