Така че изглежда, че моето внедряване се справя дори с основния сериен qsort при около 1 милиард елемента. Повечето паралелни алгоритми за qsort онлайн са за сортиране на масиви с цели числа и други неща, но искам да мога да сортирам всичко, използвайки персонализирани компаратори като вградения qsort, защото бих искал да сортирам някои структури, които имам. Използвам 12 нишки и мога да проверя, че се зараждат правилно, гледайки отгоре. Може би създавам твърде много и трябва да спра да създавам нови нишки въз основа на дълбочината на рекурсията? Знам, че моето внедряване на qsort е доста основно и очевидно вграденият qsort е имал много работа и оптимизация, но не виждам защо не получавам добро ускоряване с паралелизиране. Всеки принос ще бъде много оценен, тъй като бих могъл да използвам този код в много области, ако мога да го запазя общ. Благодаря!
void test ( void* data, uint64_t startIdx, uint64_t endIdx, size_t dataSize, int (*cmp)(const void *, const void *) )
{
#pragma omp parallel
{
#pragma omp single nowait
{
p_qsort( data, 0, MAX_INTS - 1, sizeof (testint), cmp );
}
}
}
void p_qsort ( void* data, uint64_t startIdx, uint64_t endIdx, size_t dataSize, int (*cmp)(const void *, const void *) )
{
uint64_t idx = p_qsort_partition( data, startIdx, endIdx, dataSize, cmp );
//Left array
if ( startIdx < idx - 1 )
{
#pragma omp task
p_qsort( data, startIdx, idx - 1, dataSize, cmp );
}
//Right array
if ( endIdx > idx )
{
#pragma omp task
p_qsort( data, idx, endIdx, dataSize, cmp );
}
}
void swapVoidElements ( void* el1, void* el2, size_t size )
{
if ( el1 == el2 )
return;
void* temp = malloc( size );
//temp = el1
memcpy( temp, el1, size );
//el1 = el2
memcpy( el1, el2, size );
//el2 = temp
memcpy( el2, temp, size );
free( temp );
}
uint64_t p_qsort_partition ( void* data, uint64_t left, uint64_t right, size_t dataSize, int (*cmp)(const void *, const void *) )
{
void* pivotP = getVoidPtr( data, left, dataSize );
void* pivotCmp = malloc( dataSize );
memcpy( pivotCmp, pivotP, dataSize );
while ( left <= right )
{
while ( cmp( getVoidPtr( data, left, dataSize ), pivotCmp ) < 0 )
left++;
//while ( array[right] > pivot )
while ( cmp( getVoidPtr( data, right, dataSize ), pivotCmp ) > 0 )
right--;
//Swap
if ( left <= right )
{
void* leftP = getVoidPtr( data, left, dataSize );
void* rightP = getVoidPtr( data, right, dataSize );
swapVoidElements( leftP, rightP, dataSize );
left++;
right--;
}
}
free( pivotCmp );
return left;
}
void* getVoidPtr ( void* data, uint64_t idx, size_t dataSize )
{
uint64_t idxNum = idx * dataSize;
char* test = ((char*) data) + idxNum;
return (void *) test;
}