Разница в разделах Spark SQL Shuffle

Я пытаюсь понять Spark Sql Shuffle Partitions, для которого по умолчанию установлено значение 200. Данные выглядят следующим образом, за ними следует количество разделов, созданных для двух случаев.

scala> flightData2015.show(3)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+

scala> println(flightData2015.sort("DEST_COUNTRY_NAME").rdd.getNumPartitions)
104

scala> println(flightData2015.groupBy("DEST_COUNTRY_NAME").count().rdd.getNumPartitions)
200

Оба случая вызывают стадию перемешивания, которая должна привести к 200 разделам (значение по умолчанию). Может кто-нибудь объяснить, почему есть разница?


person Vedant Kumar    schedule 22.06.2020    source источник


Ответы (2)


Согласно документации Spark существует два способа перераспределения данных. Один - через эту конфигурацию spark.sql.shuffle.partitions по умолчанию 200 и всегда применяется при запуске любого соединения или агрегирования, как вы можете видеть здесь.

Когда мы говорим о sort(), это не так просто, Spark использует планировщик, чтобы определить, насколько искажены данные в наборе данных. Если он не слишком перекошен, вместо использования sort-merge join, что приведет к созданию 200 разделов, как вы ожидали, он предпочитает выполнять broadcast данных по разделам, избегая полного перемешивания. Это может сэкономить время во время сортировки, чтобы уменьшить объем сетевого трафика подробнее здесь.

person Thiago Baldim    schedule 22.06.2020

Разница между этими двумя ситуациями в том, что sort и groupBy используют другой разделитель под капотом.

  1. groupBy - использует hashPartitioning, что означает, что он вычисляет хэш ключа, а затем вычисляет pmod на 200 (или любое другое значение, заданное как количество перемешиваемых разделов), поэтому он всегда будет создавать 200 разделов (даже если некоторые из них могут быть пустыми)
  2. _6 _ / _ 7_ - использует rangePartitioning, что означает, что он запускает отдельное задание для выборки данных и на основе этого создает границы для разделов (пытаясь сделать их 200). Теперь, основываясь на распределении выборочных данных и фактическом количестве строк, он может создать границы менее 200, поэтому вы получили только 104.
person David Vrba    schedule 22.06.2020