История заключается в том, что когда я пытаюсь решить проблему с большими данными в нашем задании Spark, которое собирало все данные в узел драйвера вместо того, чтобы получать и обрабатывать данные в каждом разделе, а затем получило ошибку сериализации

org.apache.spark.SparkException: Task not serializable

Первое, что я сделал, — это попытался найти соответствующую информацию об этой ошибке на Stack Overflow, Medium и т. д. К счастью, я получил несколько советов, (ссылки в приложении), которые, я думаю, помогли мне узнать больше о как выполняется задание Spark, что такое ленивая оценка и некоторые примечания, когда мы используем flatMap или mapPartition и т. д. И я поделюсь здесь со всеми, у кого возникает та же проблема, что и у меня, это может сэкономить много времени для решения вашей проблемы.

Зачем нам нужна сериализация в задании Spark?

Механизм Spark разделит задание на несколько дочерних заданий, отправит их исполнителям в кластерной сети Spark и запустит на них. Для этого Spark сериализует объект в поток байтов, который можно вернуть обратно в копию объекта.

И если есть переменная, ссылка внутри вашего объекта не может быть сериализована, тогда Spark выдаст ошибку Task not serializable.

Как это решить?

Трудно сказать, что именно уловило проблемы с сериализацией, потому что это зависит от вашего кода задания Spark. Но есть несколько советов, которые вы должны попробовать:

  • Как можно чаще объявляйте свои переменные и функции внутри Object. Объект выглядит как глобальная переменная, и мы можем использовать его в любом месте вашего кода.
  • Объявите, что ваш собственный класс расширяет Serializable, чтобы убедиться, что ваш класс будет передан правильно.
  • Если есть переменная, которую нельзя сериализовать, вы можете использовать аннотацию @transient следующим образом:
@transient lazy val queue: AmazonSQS = AWSQueueUtils.getClient(config)
  • Если вы используете flatMap или mapPartitions, вы должны следовать приведенной ниже практике из-за использования TravelsableOne
val result: Int = df.rdd.mapPartitions(p => Iterator {
  // Collect data per partition first
  val partition: Seq[Row] = p.toList
  // Any calculation on partition val
  ...
}).collect().sum

Как его отладить?

Важным ключом к решению этой проблемы является отладка и точное определение того, какая переменная или ссылка вызвала ошибку сериализации.

Во-первых, Spark использует SerializationDebugger в качестве отладчика по умолчанию для обнаружения проблем с сериализацией, но иногда он может столкнуться с ошибкой JVM.

SerializationDebugger:java.lang.StackOverflowError

Вы также можете отключить его, включив флаги расширенной отладочной информации в JVM, задав следующие свойства в Конфигурации Spark при создании кластера или добавив их в spark-submit параметры

spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true

Во-вторых, если вы развертываете задание Spark в режиме YARN, иногда вы не можете получить журнал (распечатать на консоли) для заданий executors. Вам нужно установить yarn.log-aggregation-enable=true в настройках вашего кластера.

После этого я надеюсь, что эти заметки помогут вам легче изучить и решить проблему с сериализацией Spark. А также большое спасибо всем авторам в приложении ниже, все ваши заметки очень помогли мне решить мою собственную проблему.

Приложение