История заключается в том, что когда я пытаюсь решить проблему с большими данными в нашем задании Spark, которое собирало все данные в узел драйвера вместо того, чтобы получать и обрабатывать данные в каждом разделе, а затем получило ошибку сериализации
org.apache.spark.SparkException: Task not serializable
Первое, что я сделал, — это попытался найти соответствующую информацию об этой ошибке на Stack Overflow, Medium и т. д. К счастью, я получил несколько советов, (ссылки в приложении), которые, я думаю, помогли мне узнать больше о как выполняется задание Spark, что такое ленивая оценка и некоторые примечания, когда мы используем flatMap
или mapPartition
и т. д. И я поделюсь здесь со всеми, у кого возникает та же проблема, что и у меня, это может сэкономить много времени для решения вашей проблемы.
Зачем нам нужна сериализация в задании Spark?
Механизм Spark разделит задание на несколько дочерних заданий, отправит их исполнителям в кластерной сети Spark и запустит на них. Для этого Spark сериализует объект в поток байтов, который можно вернуть обратно в копию объекта.
И если есть переменная, ссылка внутри вашего объекта не может быть сериализована, тогда Spark выдаст ошибку Task not serializable
.
Как это решить?
Трудно сказать, что именно уловило проблемы с сериализацией, потому что это зависит от вашего кода задания Spark. Но есть несколько советов, которые вы должны попробовать:
- Как можно чаще объявляйте свои переменные и функции внутри Object. Объект выглядит как глобальная переменная, и мы можем использовать его в любом месте вашего кода.
- Объявите, что ваш собственный класс расширяет
Serializable
, чтобы убедиться, что ваш класс будет передан правильно. - Если есть переменная, которую нельзя сериализовать, вы можете использовать аннотацию
@transient
следующим образом:
@transient lazy val queue: AmazonSQS = AWSQueueUtils.getClient(config)
- Если вы используете
flatMap
илиmapPartitions
, вы должны следовать приведенной ниже практике из-за использованияTravelsableOne
val result: Int = df.rdd.mapPartitions(p => Iterator { // Collect data per partition first val partition: Seq[Row] = p.toList // Any calculation on partition val ... }).collect().sum
Как его отладить?
Важным ключом к решению этой проблемы является отладка и точное определение того, какая переменная или ссылка вызвала ошибку сериализации.
Во-первых, Spark использует SerializationDebugger в качестве отладчика по умолчанию для обнаружения проблем с сериализацией, но иногда он может столкнуться с ошибкой JVM.
SerializationDebugger:java.lang.StackOverflowError
Вы также можете отключить его, включив флаги расширенной отладочной информации в JVM, задав следующие свойства в Конфигурации Spark при создании кластера или добавив их в spark-submit
параметры
spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
Во-вторых, если вы развертываете задание Spark в режиме YARN, иногда вы не можете получить журнал (распечатать на консоли) для заданий executors. Вам нужно установить yarn.log-aggregation-enable=true
в настройках вашего кластера.
После этого я надеюсь, что эти заметки помогут вам легче изучить и решить проблему с сериализацией Spark. А также большое спасибо всем авторам в приложении ниже, все ваши заметки очень помогли мне решить мою собственную проблему.