Оконная функция Spark плотности_ранка — без предложения partitionBy

Я использую Spark 1.6.2, Scala 2.10.5 и Java 1.7.

Наш вариант использования требует, чтобы мы выполнили плотности_rank() для набора данных из более чем 200 миллионов строк без использования предложения partitionBy, используется только предложение orderBy. В настоящее время это выполняется в MSSQL и занимает около 30 минут.

Я реализовал аналогичную логику в Spark, как показано ниже:

val df1 = hqlContext.read.format("jdbc").options(
  Map("url" -> url, "driver" -> driver,
  "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()

df1.cache()

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))

Я отправляю задание в режиме Yarn-cluster, как показано ниже. У меня есть кластер Hadoop 2.6 с двумя узлами, каждый с 4 виртуальными ядрами и 32 ГБ памяти.

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master  yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar

В журналах я вижу, что таблица из примерно 200 миллионов строк из MSSQL импортируется и кэшируется в Spark за 15 минут. Я вижу, что до этого этапа используется около 5 ГБ памяти, и еще свободно около 6,2 ГБ памяти на одном из исполнителей и 11 ГБ памяти на другом.

Но шаг в плотности_rank () всегда завершается с ошибкой «Превышен лимит накладных расходов GC» через несколько минут. Я даже установил объем памяти драйвера до 7 г, как вы можете заметить в приведенной выше команде spark-submit. Но, безрезультатно!. Конечно, я понимаю, что отсутствие раздела partitionBy на самом деле вызывает проблемы в Spark. Но, к сожалению, это тот вариант использования, с которым нам приходится иметь дело.

Не могли бы вы пролить больше света здесь? Я что-то упускаю ? Есть ли альтернатива использованию оконной функции плотности_ранка в Spark? Как, например, использование функции «zipWithIndex», предложенной одним из экспертов в другом месте на этом форуме? Будет ли это давать те же результаты, что и при плотности_rank, поскольку я понимаю, что метод «zipWithIndex» копирует функцию row_number() в отличие от плотности_rank?

Любые полезные советы приветствуются! Большое спасибо!


person Prash    schedule 02.01.2017    source источник


Ответы (1)


Здесь есть две разные проблемы:

  • Вы загружаете данные через соединение JDBC, не предоставляя столбец секционирования или предикаты секционирования. Это загружает все данные, используя один поток исполнителя.

    Эту проблему обычно довольно легко решить, либо используя один из существующих столбцов, либо предоставляя искусственный ключ.

  • Вы используете оконные функции без partitionBy. В результате все данные перетасовываются в один раздел, сортируются локально и обрабатываются в одном потоке.

    В общем, не существует универсального решения, которое может решить эту проблему, используя только Dataset API, но есть некоторые приемы, которые вы можете использовать:

    • Создание искусственных разделов, отражающих требуемый порядок записей. Я описал этот метод в своем ответе на Избегайте влияния на производительность режима одного раздела в оконных функциях Spark

      Аналогичный метод можно использовать и в вашем случае, но он потребует многоэтапного процесса, аналогичного описанному ниже.

    • С помощью ассоциативных методов вы можете использовать два отдельных сканирования по отсортированному RDD (должна быть возможность сделать то же самое без преобразования из Dataset) и дополнительное действие:

      • Compute partial results for each partition (In your case rank for a given partition).
      • Соберите необходимые сводки (в вашем случае границы разделов и накопленное значение ранга для каждого раздела).
      • Выполните второе сканирование, чтобы исправить частичные агрегаты из предыдущих разделов.

    Один из примеров этого подхода, который можно легко настроить в соответствии с вашим случаем, можно найти в разделе Как вычислить совокупную сумму с помощью Spark

person zero323    schedule 02.01.2017
comment
Большое спасибо за ваши предложения! Мне удалось сократить время импорта данных из MSSQL с помощью параметров partitionColumn в источнике данных JDBC. Но рекомендации по плотному ранжированию без partitionBy мне потребуются больше времени, чтобы переварить, так как я новичок в Scala. Но, большое спасибо за руководство! - person Prash; 05.01.2017