Производительность искры не повышается

Я использую Zeppelin для чтения avro-файлов размером в ГБ и записи в миллиардах. Я пытался использовать 2 экземпляра и 7 экземпляров на AWS EMR, но производительность кажется одинаковой. С экземплярами 7 это все еще занимает много времени. Код:

val snowball = spark.read.avro(snowBallUrl + folder + prefix + "*.avro")
val prod = spark.read.avro(prodUrl + folder + prefix + "*.avro")

snowball.persist()
prod.persist()

val snowballCount = snowball.count()
val prodCount = prod.count()

val u = snowball.union(prod)  

Output:
snowballCount: Long = 13537690
prodCount: Long = 193885314

А ресурсы можно посмотреть здесь:

введите здесь описание изображения

Для параметра spark.executor.cores установлено значение 1. Если я попытаюсь изменить этот номер, Zeppelin не будет работать и spark context выключится. Было бы здорово, если бы кто-то мог немного подсказать, чтобы улучшить производительность.

Изменить: я проверил, сколько разделов он создал:

snowball.rdd.partitions.size
prod.rdd.partitions.size
u.rdd.partitions.size

res21: Int = 55
res22: Int = 737
res23: Int = 792

person Waqar Ahmed    schedule 18.05.2018    source источник
comment
Можете ли вы проверить, сколько реальных файлов у вас есть? У меня такое ощущение, что это число больше Integer.MAX_VALUE.   -  person vvg    schedule 22.05.2018
comment
У меня было 2 файла по 3 ГБ и примерно 300 файлов по 250 МБ. Эта ошибка возникает, потому что размер раздела превышает этот предел.   -  person Waqar Ahmed    schedule 22.05.2018
comment
Возможный дубликат SQL-запроса в Spark/scala Размер превышает целое число .MAX_VALUE   -  person philantrovert    schedule 22.05.2018
comment
@philantrovert уже указал вам на корень проблемы No Spark shuffle block can be larger than 2GB (Integer.MAX_VALUE bytes) so you need more / smaller partitions. Пробовали ли вы увеличить количество разделов в RDD u с помощью функции repartition?   -  person jose.goncabel    schedule 22.05.2018
comment
@jose.goncabel jose.goncabel Не могли бы вы описать, как я могу определить стратегию перераспределения в зависимости от размера? Если размер в ГБ, то переразметка стоит, но если размер в МБ и мал, то переразметка не стоит. Более того, я не знаю, как получить размер кадра данных во время выполнения, чтобы выполнить переразметку.   -  person Waqar Ahmed    schedule 23.05.2018
comment
почему вы используете persist, тогда как ваши данные не были изменены?   -  person Setop    schedule 23.05.2018
comment
@Setop Я просто тестировал это. Потому что сначала я использую count. И после этого в объединении он будет пересчитывать фрейм данных, поэтому я проверял, улучшит ли сохранение производительность или нет.   -  person Waqar Ahmed    schedule 23.05.2018
comment
@WaqarAhmed хорошо. Еще один способ спросить: вам действительно нужно считать или объединение является самой важной частью процесса?   -  person Setop    schedule 23.05.2018
comment
@Setop Union — важнейшая задача. На самом деле я объединяю 2 разных кадра данных размером около 90 ГБ и удаляю из них дубликаты.   -  person Waqar Ahmed    schedule 23.05.2018
comment
@WaqarAhmed, удаление дубликатов - это не очень хорошо масштабируемый процесс. Может быть, вы можете помочь, предоставив хэш-функцию. Но не знаю как.   -  person Setop    schedule 23.05.2018
comment
Думаю, хэш-функция @Setop не будет работать. Потому что идентификаторы хэшируются, и для одного идентификатора будет создан один раздел.   -  person Waqar Ahmed    schedule 23.05.2018
comment
Во-первых, вам нужно проверить пользовательский интерфейс искры, сколько исполнителей вы выделили и сколько задач для каждого этапа.   -  person zjffdu    schedule 24.05.2018