Определить искру udf путем отражения строки

Я пытаюсь определить udf в spark (2.0) из строки, содержащей определение функции scala. Вот фрагмент:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

Это дает мне ошибку:

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

Однако, когда я определяю udf как:

val f = udf((s:String) => 5)

это работает просто отлично. В чем здесь проблема? Конечная цель состоит в том, чтобы взять строку с определением функции scala и использовать ее как udf.


person sourabh    schedule 09.08.2016    source источник
comment
Похоже, вы столкнулись с этой проблемой — issues.apache.org/jira/browse/ СПАРК-9219   -  person vsminkov    schedule 12.08.2016
comment
@vsminkov Это не так.   -  person    schedule 12.08.2016
comment
Чувак, ты не мог совместить двух более сложных и уродливых монстров, чем Spark и scala Reflection. :)   -  person Alec    schedule 12.08.2016
comment
Опасности профессии! :)   -  person sourabh    schedule 16.08.2016


Ответы (2)


Как заметил Джованни, проблема заключается в том, что загрузчики классов различаются (вы можете исследовать это подробнее, вызвав .getClass.getClassLoader для любого объекта). Затем, когда рабочие пытаются десериализовать вашу отраженную функцию, начинается ад.

Вот решение, которое не требует взлома загрузчика классов. Идея состоит в том, чтобы перенести этап отражения на рабочих. В конечном итоге нам придется повторить шаг отражения, но только один раз для каждого работника. Я думаю, что это довольно оптимально — даже если вы сделали отражение только один раз на главном узле, вам пришлось бы проделать немало работы для каждого рабочего, чтобы заставить их распознать функцию.

val f = udf {
  new Function1[String,Int] with Serializable {
    import scala.reflect.runtime.universe._
    import scala.reflect.runtime.currentMirror
    import scala.tools.reflect.ToolBox

    lazy val toolbox = currentMirror.mkToolBox()
    lazy val func = {
      println("reflected function") // triggered at every worker
      toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
    }

    def apply(s: String): Int = func(s)
  }
}

Затем вызов sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show работает нормально.

Не стесняйтесь комментировать println - это просто способ подсчитать, сколько раз произошло отражение. В spark-shell --master 'local' это только один раз, а в spark-shell --master 'local[2]' — дважды.

Как это работает

UDF оценивается немедленно, но никогда не используется, пока не достигнет рабочих узлов, поэтому ленивые значения toolbox и func оцениваются только на рабочих узлах. Кроме того, поскольку они ленивы, они оцениваются только один раз для каждого работника.

person Alec    schedule 14.08.2016
comment
можно ли определить типы вместо указания типов String и Int, если это возможно, тогда он обрабатывает любой тип UDF - person Ajit; 07.03.2018

У меня была та же ошибка, и она не показывает ClassNotFoundException, потому что класс JavaDeserializationStream перехватывает исключение, в зависимости от вашей среды он терпит неудачу, потому что он не может найти класс, который вы пытаетесь выполнить, из вашего RDD/DataSet, но он не показывает ClassNotFoundError . Чтобы решить эту проблему, мне пришлось создать банку со всеми классами моего проекта (включая функцию и зависимости) и включить банку в среду искры.

Это для автономного кластера

conf.setJars ( Array ("/fullpath/yourgeneratedjar.jar", "/fullpath/otherdependencies.jar") )

и это для кластера пряжи

conf.set("spark.yarn.jars", "/fullpath/yourgeneratedjar.jar,/fullpath/otherdependencies.jar")
person Giovanny Gutierrez    schedule 11.08.2016
comment
Попытался добавить org.scala-lang:scala-compiler:2.11.8 и org.scala-lang:scala-reflect:2.11.8 специально в --packages list ; но все же ошибка та же. Я все равно включаю все свои зависимости приложений в виде списка координат maven перед запуском задания. - person sourabh; 12.08.2016
comment
@sourabh Я думаю, что нашел проблему: когда вы используете отражение для создания функции, функция доступна только для локального загрузчика классов, когда вы пытаетесь десериализовать функцию, она выдает исключение ClassNotFoundException, потому что функция недоступна для рабочего класса. загрузчики классов, проверьте сгенерированные классы при использовании val f = udf((s:String) => 5), и вы увидите класс MyObject$$anonfunc$... для функции. Я предлагаю сгенерировать файл .class с помощью интерпретатора scala и динамически сгенерировать банку, содержащую этот класс. - person Giovanny Gutierrez; 12.08.2016
comment
@GiovannyGutierrez Можете ли вы переместить последний комментарий в ответ и расширить его? - person ; 13.08.2016
comment
Я думаю, что @alec сделал лучший подход к решению проблемы, используя ленивую инициализацию, и нет необходимости вносить дополнительные изменения. - person Giovanny Gutierrez; 14.08.2016