Коя конструкция за синхронизиране на Java е вероятно да осигури най-добрата производителност за сценарий на едновременна, итеративна обработка с фиксиран брой нишки като този, описан по-долу? След като експериментирах сам за известно време (използвайки ExecutorService и CyclicBarrier) и бях донякъде изненадан от резултатите, ще съм благодарен за някои експертни съвети и може би някои нови идеи. Съществуващите въпроси тук изглежда не се фокусират основно върху ефективността, следователно този нов. Благодаря предварително!
Ядрото на приложението е прост итеративен алгоритъм за обработка на данни, паралелно разпределен на изчислителното натоварване в 8 ядра на Mac Pro, работещ под OS X 10.6 и Java 1.6.0_07. Данните, които трябва да бъдат обработени, се разделят на 8 блока и всеки блок се подава към Runnable, за да се изпълни от една от фиксиран брой нишки. Паралелизирането на алгоритъма беше сравнително лесно и той функционално работи според желанията, но неговата производителност все още не е това, което смятам, че може да бъде. Приложението изглежда прекарва много време в синхронизиране на системни повиквания, така че след известно профилиране се чудя дали съм избрал най-подходящия механизъм(и) за синхронизиране.
Ключово изискване на алгоритъма е, че той трябва да работи на етапи, така че нишките трябва да се синхронизират в края на всеки етап. Основната нишка подготвя работата (много ниски режийни разходи), предава я на нишките, оставя ги да работят върху нея, след което продължава, когато всички нишки са готови, пренарежда работата (отново много ниски режийни) и повтаря цикъла. Машината е посветена на тази задача, събирането на боклука е сведено до минимум чрез използване на пулове на нишки от предварително разпределени елементи и броят на нишките може да бъде фиксиран (без входящи заявки или други подобни, само една нишка на ядро на процесора).
V1 - ExecutorService
Първата ми реализация използва ExecutorService с 8 работни нишки. Програмата създава 8 задачи, които държат работата и след това им позволява да работят върху нея, приблизително така:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
Това работи добре функционално (прави това, което трябва) и за много големи работни елементи наистина всичките 8 процесора стават силно натоварени, доколкото се очаква да позволи алгоритъмът за обработка (някои работни елементи ще завършат по-бързо от други, след това ще се попречат) . Въпреки това, тъй като работните елементи стават по-малки (а това всъщност не е под контрола на програмата), натоварването на процесора на потребителя се свива драстично:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
Легенда: - размер на блок = размер на работния елемент (= изчислителни стъпки) - система = системно натоварване, както е показано в OS X Activity Monitor (червена лента) - потребител = потребителско натоварване, както е показано в OS X Activity Monitor (зелена лента) - цикли/сек = итерации през основния цикъл while, повече е по-добре
Основната област на безпокойство тук е високият процент време, прекарано в системата, което изглежда се управлява от повиквания за синхронизиране на нишки. Както се очаква, за по-малки работни елементи, ExecutorService.invokeAll() ще изисква относително повече усилия за синхронизиране на нишките в сравнение с количеството работа, извършвана във всяка нишка. Но тъй като ExecutorService е по-генеричен, отколкото би трябвало да бъде за този случай на употреба (може да поставя задачи на опашка за нишки, ако има повече задачи, отколкото ядра), реших, че може би ще има по-проста конструкция за синхронизиране.
V2 - Циклична бариера
Следващата реализация използва CyclicBarrier за синхронизиране на нишките преди получаване на работа и след завършването й, приблизително както следва:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Отново, това работи добре функционално (прави каквото трябва) и за много големи работни елементи наистина всичките 8 процесора стават силно натоварени, както преди. Въпреки това, тъй като работните елементи стават по-малки, товарът все още намалява драстично:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
За големи работни елементи синхронизацията е незначителна и производителността е идентична с V1. Но неочаквано, резултатите от (високо специализираната) CyclicBarrier изглеждат МНОГО ПО-ЛОШИ от тези за (генеричната) ExecutorService: пропускателната способност (цикли/сек) е само около 1/4 от V1. Предварителното заключение би било, че въпреки че това изглежда е рекламираният идеален случай за използване на CyclicBarrier, той се представя много по-зле от генеричния ExecutorService.
V3 - Изчакване/Уведомяване + CyclicBarrier
Изглежда, че си струва да опитате да замените първата циклична бариера await() с прост механизъм за изчакване/уведомяване:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Отново, това работи добре функционално (прави това, което трябва).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
Пропускателната способност за малки работни елементи все още е много по-лоша от тази на ExecutorService, но около 2 пъти по-голяма от тази на CyclicBarrier. Елиминирането на една CyclicBarrier елиминира половината празнина.
V4 - Изчакване при заетост вместо изчакване/уведомяване
Тъй като това приложение е основното, което се изпълнява в системата и ядрата така или иначе са неактивни, ако не са заети с работен елемент, защо не опитате натоварено изчакване за работни елементи във всяка нишка, дори ако това върти процесора без нужда. Кодът на работната нишка се променя, както следва:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Също така работи добре функционално (прави това, което трябва).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
За малки работни елементи това увеличава пропускателната способност с още 10% спрямо варианта CyclicBarrier + изчакване/уведомяване, което не е маловажно. Но все още е с много по-ниска производителност от V1 с ExecutorService.
V5 - ?
И така, кой е най-добрият механизъм за синхронизиране за такъв (вероятно не рядко срещан) проблем? Омръзна ми да пиша собствен механизъм за синхронизиране, който да замени изцяло ExecutorService (ако приемем, че е твърде общ и трябва да има нещо, което все още може да бъде извадено, за да стане по-ефективен). Това не е моята област на експертиза и се притеснявам, че бих прекарал много време в отстраняване на грешки (тъй като дори не съм сигурен, че моите варианти за изчакване/уведомяване и изчакване при заето са правилни) за несигурна печалба.
Всеки съвет ще бъде много оценен.