Защо задачата на Spark отнема много време, за да намери блок локално?

RDD има 512 дяла с еднакъв размер и е 100% кеширана в паметта на 512 изпълнителя.

Имам работа за събиране на филтър-карта с 512 задачи. Понякога тази работа завършва под секунда. В други случаи 50% от задачите се изпълняват за под секунда, 45% от задачите отнемат 10 секунди и 5% от задачите отнемат 20 секунди.

Ето лог от изпълнител, при който задачата отне 20 секунди:

15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312 
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB) 
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally 
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver 

Така изглежда, че 20-те секунди са изразходвани за намиране на локалния блок. Разглеждането на регистрационните файлове за други бавни задачи показва, че всички те са забавени по същата причина. Разбирам, че локален блок означава в рамките на един и същ екземпляр на JVM и затова не разбирам защо отнема толкова време, за да го намерите.

Тъй като забавянето винаги е или точно 10 секунди, или точно 20 секунди, подозирам, че се дължи на 10-секундно изчакване на някой слушател или нещо подобно. Ако това е вярно, тогава предполагам, че моите възможности са или да разбера защо времето за изчакване изтича и да го поправя, или да направя времето за изчакване по-кратко, така че да се опитва по-често.

Защо задачата отнема толкова време за намиране на локален блок и как мога да разреша това?

Актуализация: Добавяне на журнал за ОТСТРАНЯВАНЕ НА ГРЕШКИ за org.apache.spark.storage.

16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029)
16/02/01 12:14:07 DEBUG Executor: Task 3029's epoch is 1
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162)
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took  2 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took  2 ms
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took  1 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took  1 ms
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668)
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115

person user2179977    schedule 20.01.2016    source източник
comment
Бихте ли могли да включите нивото на ОТСТРАНЯВАНЕ НА ГРЕШКИ на влизане в пакета org.apache.spark.storage и да споделите резултатите? Проверих кода на BlockManager и има много неща, които могат да се случат по време на метода doGetLocal и има записи в регистрационния файл на ниво отстраняване на грешки, които биха помогнали да се разбере какво точно прави. Между другото, Found block rdd_5_215 locally означава, че го е намерил в локалния BlockManager (не в отдалечения), но може да получава блок от паметта, от диск или от външно хранилище.   -  person Alex Larikov    schedule 29.01.2016
comment
Благодаря, @AlexLarikov, добавих дневника за DEBUG. Когато гледам уеб интерфейса на Spark, той ми казва, че RDD е 100% кеширан в паметта. При това положение все още ли е разумно Spark да извлече блок от диска?   -  person user2179977    schedule 02.02.2016


Отговори (2)


Единственото нещо, което изглежда ми прави впечатление е, че имате включена репликация чрез вашето ниво на съхранение StorageLevel(false, true, false, true, 1)

Тъй като имате 512 дяла в 512 изпълнителя, той може да репликира блоковете във всеки изпълнител, което може да причини това забавяне в края. Бих се опитал да изключа репликацията и да видя какво прави това за производителността.

person Stephen Carman    schedule 03.05.2016

Колко общо ядра заделяте за вашето приложение Spark? Това може да се случи, ако разпределяте 256 ядра и ако стойността за spark.locality.wait е 10.

Не познавам средата ви, но изглежда имате твърде много изпълнители. Имайте само няколко изпълнители (в зависимост от това колко мощни са вашите изчислителни възли) и разполагайте с множество ядра, достъпни за всеки изпълнител. Накратко, вместо да имате много процеси с по 1 нишка всеки, имайте няколко процеса с много нишки всеки.

person ACH    schedule 15.07.2016