Передайте функцию с любым возвращаемым типом класса case в качестве параметра

Это может быть глупый вопрос, но я боролся в течение довольно долгого времени. Это действительно похоже на этот вопрос, но я не смог его применить в моем коде (дуэт с шаблонами или функцией).

Я хочу передать функцию преобразования flatMap (или карты) в аргумент функции, а затем передать ее функции стратегии, которая фактически вызывает метод df.rdd.flatMap. попробую объяснить!

case class Order(id: String, totalValue: Double, freight: Double) 
case class Product(id: String, price: Double) 

... or any other case class, whatever one needs to transform a row into ...

Класс сущности:

class Entity(path: String) = {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
      this.getStrategy.flatMap[T](mapFunction)
      return this
  }
  def save(path: String): Unit = {
      ... write logic ...
  } 
}

Сущность может иметь разные стратегии для своих методов. EntityStrategy выглядит следующим образом:

abstract class EntityStrategy(private val entity: Entity,
                              private val spark: SparkSession) {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
  def map[T](mapFunction: (Row) => T)
}

И один пример реализации EntityStrategy:

class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
  extends EntityStrategy(entity, spark) {
  ...
  override def map[T](mapFunction: Row => T): Unit = {
    val rdd = this.getData.rdd.map(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }

  override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
    var rdd = this.getData.rdd.flatMap(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }
}

Наконец, я хотел бы создать функцию flatMap/map и вызвать ее следующим образом:

def transformFlatMap(row: Row): ArrayBuffer[Order] = {
    var orders = new ArrayBuffer[Order]
    var _deliveries = row.getAs[Seq[Row]]("deliveries")
    _deliveries.foreach(_delivery => {
       var order = Order(
           id = row.getAs[String]("id"),
           totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
      orders += order
    })
   return orders
}

val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")

Конечно, это не работает. Я получаю сообщение об ошибке на SparkEntityStrategy:

Ошибка: (95, 35) ClassTag не доступен для T val rdd = this.getData.rdd.map(f = mapFunction)

Я попытался добавить (implicit encoder: Encoder: T) как к методам сущности, так и к методам стратегии, но это не помогло. Возможно, я сделал что-то не так, так как я новичок в Scala.

Если я уберу "Т" и передам фактический класс case, все будет работать нормально.


person leovrf    schedule 22.02.2019    source источник


Ответы (1)


Оказывается, чтобы компилятор и методы Spark были удовлетворены, мне нужно было добавить следующие теги типов:

[T <: scala.Product : ClassTag : TypeTag]

Таким образом, оба метода стали:

def map[T <: Product : ClassTag : TypeTag](mapFunction: (Row) => T): Entity
def flatMap[T <: scala.Product : ClassTag : TypeTag](mapFunction: (Row) => TraversableOnce[T]): Entity

О scala.Product:

Базовый трейт для всех продуктов, которые в стандартной библиотеке включают по крайней мере от scala.Product1 до scala.Product22 и, следовательно, также их подклассы от scala.Tuple1 до scala.Tuple22. Кроме того, все классы case реализуют Product с помощью синтетически сгенерированных методов.

Поскольку я использую объект класса case в качестве типа возвращаемого значения моей функции, мне понадобился scala.Product, чтобы createDataFrame Spark мог соответствовать правильной перегрузке .

Почему и ClassTag, и TypeTag?

При удалении TypeTag компилятор выдает следующую ошибку:

Ошибка: (96, 48) TypeTag не доступен для T this.dataFrame = this.spark.createDataFrame(rdd)

И удаление ClassTag:

Ошибка: (95, 35) ClassTag не доступен для T val rdd = this.getData.rdd.map(f = mapFunction)

Их добавление удовлетворило оба метода, и все заработало, как ожидалось.

Нашел хорошую статью, объясняющую стирание типов в Scala.

person leovrf    schedule 23.02.2019