Таким образом, кажется, что моя реализация не работает даже с базовым последовательным qsort около 1 миллиарда элементов. Большинство параллельных алгоритмов qsort в Интернете предназначены для сортировки целочисленных массивов и прочего, но я хочу иметь возможность сортировать что-либо с помощью пользовательских компараторов, таких как встроенный qsort, потому что я хотел бы сортировать некоторые структуры, которые у меня есть. Я использую 12 потоков и могу убедиться, что они правильно порождаются, глядя сверху. Возможно, я создаю слишком много потоков и должен перестать создавать новые потоки на основе глубины рекурсии? Я знаю, что моя реализация qsort довольно проста, и очевидно, что над встроенной qsort было вложено много работы и оптимизации, но я не понимаю, почему я не получаю хорошего ускорения с распараллеливанием. Буду очень признателен за любой вклад, так как я мог бы использовать этот код во многих областях, если бы я мог сохранить его общим. Спасибо!
void test ( void* data, uint64_t startIdx, uint64_t endIdx, size_t dataSize, int (*cmp)(const void *, const void *) )
{
#pragma omp parallel
{
#pragma omp single nowait
{
p_qsort( data, 0, MAX_INTS - 1, sizeof (testint), cmp );
}
}
}
void p_qsort ( void* data, uint64_t startIdx, uint64_t endIdx, size_t dataSize, int (*cmp)(const void *, const void *) )
{
uint64_t idx = p_qsort_partition( data, startIdx, endIdx, dataSize, cmp );
//Left array
if ( startIdx < idx - 1 )
{
#pragma omp task
p_qsort( data, startIdx, idx - 1, dataSize, cmp );
}
//Right array
if ( endIdx > idx )
{
#pragma omp task
p_qsort( data, idx, endIdx, dataSize, cmp );
}
}
void swapVoidElements ( void* el1, void* el2, size_t size )
{
if ( el1 == el2 )
return;
void* temp = malloc( size );
//temp = el1
memcpy( temp, el1, size );
//el1 = el2
memcpy( el1, el2, size );
//el2 = temp
memcpy( el2, temp, size );
free( temp );
}
uint64_t p_qsort_partition ( void* data, uint64_t left, uint64_t right, size_t dataSize, int (*cmp)(const void *, const void *) )
{
void* pivotP = getVoidPtr( data, left, dataSize );
void* pivotCmp = malloc( dataSize );
memcpy( pivotCmp, pivotP, dataSize );
while ( left <= right )
{
while ( cmp( getVoidPtr( data, left, dataSize ), pivotCmp ) < 0 )
left++;
//while ( array[right] > pivot )
while ( cmp( getVoidPtr( data, right, dataSize ), pivotCmp ) > 0 )
right--;
//Swap
if ( left <= right )
{
void* leftP = getVoidPtr( data, left, dataSize );
void* rightP = getVoidPtr( data, right, dataSize );
swapVoidElements( leftP, rightP, dataSize );
left++;
right--;
}
}
free( pivotCmp );
return left;
}
void* getVoidPtr ( void* data, uint64_t idx, size_t dataSize )
{
uint64_t idxNum = idx * dataSize;
char* test = ((char*) data) + idxNum;
return (void *) test;
}