Коя е най-бързата циклична синхронизация в Java (ExecutorService срещу CyclicBarrier срещу X)?

Коя конструкция за синхронизиране на Java е вероятно да осигури най-добрата производителност за сценарий на едновременна, итеративна обработка с фиксиран брой нишки като този, описан по-долу? След като експериментирах сам за известно време (използвайки ExecutorService и CyclicBarrier) и бях донякъде изненадан от резултатите, ще съм благодарен за някои експертни съвети и може би някои нови идеи. Съществуващите въпроси тук изглежда не се фокусират основно върху ефективността, следователно този нов. Благодаря предварително!

Ядрото на приложението е прост итеративен алгоритъм за обработка на данни, паралелно разпределен на изчислителното натоварване в 8 ядра на Mac Pro, работещ под OS X 10.6 и Java 1.6.0_07. Данните, които трябва да бъдат обработени, се разделят на 8 блока и всеки блок се подава към Runnable, за да се изпълни от една от фиксиран брой нишки. Паралелизирането на алгоритъма беше сравнително лесно и той функционално работи според желанията, но неговата производителност все още не е това, което смятам, че може да бъде. Приложението изглежда прекарва много време в синхронизиране на системни повиквания, така че след известно профилиране се чудя дали съм избрал най-подходящия механизъм(и) за синхронизиране.

Ключово изискване на алгоритъма е, че той трябва да работи на етапи, така че нишките трябва да се синхронизират в края на всеки етап. Основната нишка подготвя работата (много ниски режийни разходи), предава я на нишките, оставя ги да работят върху нея, след което продължава, когато всички нишки са готови, пренарежда работата (отново много ниски режийни) и повтаря цикъла. Машината е посветена на тази задача, събирането на боклука е сведено до минимум чрез използване на пулове на нишки от предварително разпределени елементи и броят на нишките може да бъде фиксиран (без входящи заявки или други подобни, само една нишка на ядро ​​на процесора).

V1 - ExecutorService

Първата ми реализация използва ExecutorService с 8 работни нишки. Програмата създава 8 задачи, които държат работата и след това им позволява да работят върху нея, приблизително така:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

Това работи добре функционално (прави това, което трябва) и за много големи работни елементи наистина всичките 8 процесора стават силно натоварени, доколкото се очаква да позволи алгоритъмът за обработка (някои работни елементи ще завършат по-бързо от други, след това ще се попречат) . Въпреки това, тъй като работните елементи стават по-малки (а това всъщност не е под контрола на програмата), натоварването на процесора на потребителя се свива драстично:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

Легенда: - размер на блок = размер на работния елемент (= изчислителни стъпки) - система = системно натоварване, както е показано в OS X Activity Monitor (червена лента) - потребител = потребителско натоварване, както е показано в OS X Activity Monitor (зелена лента) - цикли/сек = итерации през основния цикъл while, повече е по-добре

Основната област на безпокойство тук е високият процент време, прекарано в системата, което изглежда се управлява от повиквания за синхронизиране на нишки. Както се очаква, за по-малки работни елементи, ExecutorService.invokeAll() ще изисква относително повече усилия за синхронизиране на нишките в сравнение с количеството работа, извършвана във всяка нишка. Но тъй като ExecutorService е по-генеричен, отколкото би трябвало да бъде за този случай на употреба (може да поставя задачи на опашка за нишки, ако има повече задачи, отколкото ядра), реших, че може би ще има по-проста конструкция за синхронизиране.

V2 - Циклична бариера

Следващата реализация използва CyclicBarrier за синхронизиране на нишките преди получаване на работа и след завършването й, приблизително както следва:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Отново, това работи добре функционално (прави каквото трябва) и за много големи работни елементи наистина всичките 8 процесора стават силно натоварени, както преди. Въпреки това, тъй като работните елементи стават по-малки, товарът все още намалява драстично:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

За големи работни елементи синхронизацията е незначителна и производителността е идентична с V1. Но неочаквано, резултатите от (високо специализираната) CyclicBarrier изглеждат МНОГО ПО-ЛОШИ от тези за (генеричната) ExecutorService: пропускателната способност (цикли/сек) е само около 1/4 от V1. Предварителното заключение би било, че въпреки че това изглежда е рекламираният идеален случай за използване на CyclicBarrier, той се представя много по-зле от генеричния ExecutorService.

V3 - Изчакване/Уведомяване + CyclicBarrier

Изглежда, че си струва да опитате да замените първата циклична бариера await() с прост механизъм за изчакване/уведомяване:

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Отново, това работи добре функционално (прави това, което трябва).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

Пропускателната способност за малки работни елементи все още е много по-лоша от тази на ExecutorService, но около 2 пъти по-голяма от тази на CyclicBarrier. Елиминирането на една CyclicBarrier елиминира половината празнина.

V4 - Изчакване при заетост вместо изчакване/уведомяване

Тъй като това приложение е основното, което се изпълнява в системата и ядрата така или иначе са неактивни, ако не са заети с работен елемент, защо не опитате натоварено изчакване за работни елементи във всяка нишка, дори ако това върти процесора без нужда. Кодът на работната нишка се променя, както следва:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Също така работи добре функционално (прави това, което трябва).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

За малки работни елементи това увеличава пропускателната способност с още 10% спрямо варианта CyclicBarrier + изчакване/уведомяване, което не е маловажно. Но все още е с много по-ниска производителност от V1 с ExecutorService.

V5 - ?

И така, кой е най-добрият механизъм за синхронизиране за такъв (вероятно не рядко срещан) проблем? Омръзна ми да пиша собствен механизъм за синхронизиране, който да замени изцяло ExecutorService (ако приемем, че е твърде общ и трябва да има нещо, което все още може да бъде извадено, за да стане по-ефективен). Това не е моята област на експертиза и се притеснявам, че бих прекарал много време в отстраняване на грешки (тъй като дори не съм сигурен, че моите варианти за изчакване/уведомяване и изчакване при заето са правилни) за несигурна печалба.

Всеки съвет ще бъде много оценен.


person Community    schedule 26.04.2010    source източник
comment
цифрата за цикли/сек е интересна, но като цяло колко бързо вършите работата си? Ако една нишка завърши преди другите, колко време чака средно всички да се срещнат на бариерата? Вашите работни единици държат ли се в свързана опашка за блокиране?   -  person Ron    schedule 05.02.2011
comment
Звучи като рамката fork/join, въведена в Java 7 ще направи това много добре - и бих предположил, че е оптимизиран по начин, който (средно) би победил вашата персонализирана реализация.   -  person assylias    schedule 01.10.2012


Отговори (6)


Изглежда, че нямате нужда от никаква синхронизация между работниците. Може би трябва да обмислите използването на рамката ForkJoin, която е налична в Java 7, както и отделна библиотека. Някои връзки:

person damiang    schedule 04.10.2012

Актуализация: V6 - Заето изчакване, като основната нишка също работи

Очевидно подобрение на V5 (натоварено изчакване за работа в 7 работни нишки, натоварено изчакване за завършване в главната нишка) изглежда отново раздели работата на 7+1 части и остави основната нишка да обработва една част едновременно с другите работни нишки ( вместо просто заето чакане) и впоследствие заето чакане за завършване на работните елементи на всички други нишки. Това ще използва 8-ия процесор (в 8-ядрената конфигурация на примера) и ще добави неговите цикли към наличния пул от изчислителни ресурси.

Това наистина беше лесно за изпълнение. И резултатите наистина са малко по-добри:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.39
64k         1.0%     98%       6.8
16k         1.0%     98%      50.4
4096        1.0%     98%     372
1024        1.0%     98%    1317
256         1.0%     98%    3546
64          1.5%     98%    9091
16          2.0%     98%   16949

Така че това изглежда представлява най-доброто решение досега.

person Alex Dunlop    schedule 27.04.2010
comment
Това от любопитство ли е или всъщност имате нужда от допълнителна производителност? - person Tudor; 28.09.2012

Актуализация: V5 - Заето изчакване във всички нишки (засега изглежда оптимално)

Тъй като всички ядра са посветени на тази задача, струваше си да опитате просто да елиминирате всички сложни конструкции за синхронизация и да направите натоварено изчакване във всяка точка на синхронизация във всички нишки. Оказва се, че побеждава всички други подходи с голяма разлика.

Настройката е следната: започнете с V4 по-горе (CyclicBarrier + Busy Wait). Заменете CyclicBarrier с AtomicInteger, който основната нишка нулира до нула всеки цикъл. Всяка работна нишка Runnable, която завършва работата си, увеличава атомното цяло число с единица. Главната нишка заета чака:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}

Вместо 8 се стартират само 7 работни нишки (тъй като всички нишки, включително основната нишка, вече зареждат ядрото почти напълно). Резултатите са както следва:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129

Използването на изчакване/уведомяване в работните нишки намалява пропускателната способност до около 1/3 от това решение.

person Alex Dunlop    schedule 26.04.2010

Също така се чудя дали можете да опитате повече от 8 нишки. Ако процесорът ви поддържа HyperThreading, тогава (поне на теория) можете да изстискате 2 нишки на ядро ​​и да видите какво ще излезе от това.

person Community    schedule 26.04.2010

Актуализация: V7 - Изчакване при заетост, което се връща към Изчакване/Уведомяване

След известно заиграване с V6 се оказва, че заетите изчаквания затъмняват малко истинските горещи точки на приложението при профилиране. Плюс това, вентилаторът на системата продължава да се засилва, дори ако не се обработват работни елементи. Така че допълнително подобрение беше натовареното изчакване за работните елементи за фиксиран период от време (да речем, около 2 милисекунди) и след това връщане към „по-хубава“ комбинация wait()/notify(). Работните нишки просто публикуват текущия си режим на изчакване в главната нишка чрез атомно булево значение, което показва дали са заети с чакане (и следователно просто трябва да бъде зададен работен елемент) или очакват извикване за notify(), защото са в изчакайте().

Друго подобрение, което се оказа доста лесно, беше да се позволи на нишките, които са завършили основния си работен елемент, многократно да извикват обратно извикване, предоставено от клиента, докато чакат другите нишки да завършат своите първични работни елементи. По този начин времето за изчакване (което се случва, защото нишките са длъжни да получат леко различни работни натоварвания) не трябва да бъде напълно загубено за приложението.

Все още съм много заинтересован да чуя от други потребители, които са се сблъскали с подобен случай на употреба.

person Alex Dunlop    schedule 28.04.2010

Просто се насочих към тази тема и въпреки че е почти на една година, нека ви насоча към библиотеката "jbarrier", която разработихме в университета в Бон преди няколко месеца:

http://net.cs.uni-bonn.de/wg/cs/applications/jbarrier/

Бариерният пакет е насочен точно към случая, когато броят на работните нишки е ‹= броят на ядрата. Пакетът е базиран на заето изчакване, той поддържа не само бариерни действия, но и глобални редукции и освен централна бариера предлага дървовидно структурирани бариери за паралелизиране на частите за синхронизация/редукция още повече.

person Patrick    schedule 05.02.2011