Я использую Spark 1.6.2, Scala 2.10.5 и Java 1.7.
Наш вариант использования требует, чтобы мы выполнили плотности_rank() для набора данных из более чем 200 миллионов строк без использования предложения partitionBy, используется только предложение orderBy. В настоящее время это выполняется в MSSQL и занимает около 30 минут.
Я реализовал аналогичную логику в Spark, как показано ниже:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
Я отправляю задание в режиме Yarn-cluster, как показано ниже. У меня есть кластер Hadoop 2.6 с двумя узлами, каждый с 4 виртуальными ядрами и 32 ГБ памяти.
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
В журналах я вижу, что таблица из примерно 200 миллионов строк из MSSQL импортируется и кэшируется в Spark за 15 минут. Я вижу, что до этого этапа используется около 5 ГБ памяти, и еще свободно около 6,2 ГБ памяти на одном из исполнителей и 11 ГБ памяти на другом.
Но шаг в плотности_rank () всегда завершается с ошибкой «Превышен лимит накладных расходов GC» через несколько минут. Я даже установил объем памяти драйвера до 7 г, как вы можете заметить в приведенной выше команде spark-submit. Но, безрезультатно!. Конечно, я понимаю, что отсутствие раздела partitionBy на самом деле вызывает проблемы в Spark. Но, к сожалению, это тот вариант использования, с которым нам приходится иметь дело.
Не могли бы вы пролить больше света здесь? Я что-то упускаю ? Есть ли альтернатива использованию оконной функции плотности_ранка в Spark? Как, например, использование функции «zipWithIndex», предложенной одним из экспертов в другом месте на этом форуме? Будет ли это давать те же результаты, что и при плотности_rank, поскольку я понимаю, что метод «zipWithIndex» копирует функцию row_number() в отличие от плотности_rank?
Любые полезные советы приветствуются! Большое спасибо!