Конвертирайте matlab в c++, bsxfun

Опитвам се да преобразувам своя MATLAB код в C++ и откривам, че има проблем в следната ситуация:

MATLAB

A = rand(1000,40000);
b = rand(1000,1);
tic;
ans = bsxfun(@ne,b,A);
toc

c++

std::vector<std::vector<int> > A;
std::vector<int> b;
std::vector<int> ans(10000);

// initial A and b
const clock_t begin_time = clock();
for(int i = 0; i < 40000; ++i){
    for(int j = 0; j < 1000; ++j){
        if(A[i][j] != b[j])
            ans[i]++;
    }
}
double run_time = static_cast<double>((clock() - begin_time)) / CLOCKS_PER_SEC;

Намирам, че C++ case е три пъти по-бавен от MATLAB. Бих искал да попитам дали някой знае как да променя C++ кода, така че да мога да имам подобна или същата производителност като bsxfun?

След като потърсих в мрежата, намирам два възможни начина:

  1. включват библиотеки от Armadillo
  2. включват библиотеки от Octave

Но въпросът е, че не съм сигурен как да го направя, искам да кажа, че не знам подробностите за изпълнението.

Резюме:

  1. Бих искал да попитам дали някой знае как да променя C++ кода, така че да мога да имам подобна или същата производителност като bsxfun?
  2. Може ли някой да даде някои съвети или стъпки или пример, за да мога да науча как да включа Armadillo или Octave, за да изпълня тази задача.

РЕДАКТИРАНЕ:

Благодарение на @Peter компилирам с опция -O3 и след това проблемът е „решен“, искам да кажа, че скоростта е същата като MATLAB.


person sflee    schedule 17.01.2014    source източник
comment
Включвате ли инициализацията на A и b във вашия график?   -  person Joseph Mansfield    schedule 17.01.2014
comment
Помага ли, ако замените if с ans[i] += A[j][i] != b[j];? (Няма неуспешни прогнози за разклонения)   -  person Klas Lindbäck    schedule 17.01.2014
comment
Опитвал ли си паралелизиране? В зависимост от вашата система можете да използвате pthread например. След това създавате 4 нишки, всяка от които изчислява част от проблема. Това трябва да ускори изчислението ви с коефициент почти 4 (също в зависимост от вашия хардуер).   -  person omgBob    schedule 17.01.2014
comment
@sftrabbit не, не включвам това време   -  person sflee    schedule 17.01.2014
comment
@KlasLindbäck Опитах и ​​не помага   -  person sflee    schedule 17.01.2014
comment
Заменете if, както каза @KlasLindbäck, или дори по-добре: ans[i] = A[j][i]!= b[j] (не се изисква добавяне)   -  person Luis Mendo    schedule 17.01.2014
comment
Това вероятно ще бъде проблем, свързан с паметта, така че многонишковостта може и да не спечели много - но както подсказва отговорът на @Peter, ефективността на кеша вероятно ще бъде пътят напред.   -  person Edric    schedule 17.01.2014
comment
@sflee: двата кода не изчисляват едно и също нещо! Да не говорим, че ans във версията на C++ е с грешен размер (трябва да е 40000). Поправете ме, ако греша, но мисля, че искахте да напишете ans = sum(bsxfun(@ne,b,A)); в MATLAB (извикването SUM(.) липсваше)   -  person Amro    schedule 18.01.2014
comment
Не препоръчвам да използвате std::vector‹std::vector‹int›› за числени задачи. Неефективно е от много гледни точки, особено от скорост. Вместо това ще получите много по-добра производителност, като използвате специална библиотека за линейна алгебра, като Armadillo. Вижте например класа Mat, който съхранява елементите последователно в ред на главни колони, като Fortran. Също така, много функции на Armadillo са подобни на функциите на Matlab.   -  person mtall    schedule 20.01.2014


Отговори (2)


1- Пускате своите цикли в грешен ред. В C и C++ 2D масивите се съхраняват по-големи от редове, което означава, че A[j][i] и A[j][i+1] са един до друг в паметта. (Мислете за това по следния начин: A[j] е първата операция с индекс, връщаща препратка към друг вектор, който след това отново индексирате с [i]).

Поддържането на данни в кеша за възможно най-много операции е един от ключовете към производителността на модерен процесор, което означава, че искате да имате достъп до съседни елементи, когато можете. Така че сменете реда на циклите:

for(int j = 0; j < 1000; ++j){
    for(int i = 0; i < 40000; ++i){

2 - Опциите на компилатора имат голямо значение. Уверете се, че изграждате в режим „Освобождаване“ или с включена оптимизация.

3- Обичайно е да съхранявате 2D масиви в C++ като 1D масив, като сами индексирате реда/колоната с умножения. Тоест, A ще бъде вектор с размер 1000*40000, а A[j][i] вместо това ще бъде A[j*row_length + i]. Това има предимството на повече непрекъсната памет (вижте точка 1), по-малко динамично разпределение на паметта и по-добро използване на кеша.

person Peter    schedule 17.01.2014
comment
#3 е грешен, защото говорите за масиви, които вече се съхраняват последователно. Векторът на векторите във въпроса е съвсем различен въпрос, където вашият съвет е валиден. - person Ben Voigt; 18.01.2014

Както споменах в коментарите, във вашия MATLAB код липсва извикване на функцията sum (в противен случай двата кода изчисляват различни неща!). Така че трябва да бъде:

MATLAB

A = rand(1000,40000);
B = rand(1000,1);
tic
count = sum(bsxfun(@ne, A, B));
toc

На моята машина получавам:

Elapsed time is 0.036931 seconds.

Не забравяйте, че горното твърдение е векторизирано (мислете за паралелизиране на SIMD). MATLAB може също така автоматично да изпълнява този многопоточен, ако размерът е достатъчно голям.


Ето моята версия на кода в C++. Използвам прости класове за създаване на векторен/матричен интерфейс. Имайте предвид, че основните данни се съхраняват основно като 1D масив с главен ред на колони подобно на MATLAB.

C++

#include <iostream>
#include <cstdlib>        // rand
#include <ctime>          // time
#include <sys/time.h>     // gettimeofday

class Timer
{
private:
    timeval t1, t2;
public:
    Timer() {}
    ~Timer() {}
    void start() { gettimeofday(&t1, NULL); }
    void stop() { gettimeofday(&t2, NULL); }
    double elapsedTime() { return (t2.tv_sec - t1.tv_sec)*1000.0 + (t2.tv_usec - t1.tv_usec)/1000; }
};

template<typename T>
class Vector
{
private:
    T *data;
    const size_t num;
public:
    Vector(const size_t num) : num(num) { data = new T[num]; }
    ~Vector() { delete[] data; }
    inline T& operator() (const size_t i) { return data[i]; }
    inline const T& operator() (const size_t i) const { return data[i]; }
    size_t size() const { return num; }
};

template<typename T>
class Matrix
{
private:
    T *data;
    const size_t nrows, ncols;
public:
    Matrix(const size_t nr, const size_t nc) : nrows(nr), ncols(nc) { data = new T[nrows * ncols]; }
    ~Matrix() { delete[] data; }
    inline T& operator() (const size_t r, const size_t c) { return data[c*nrows + r]; }
    inline const T& operator() (const size_t r, const size_t c) const { return data[c*nrows + r]; }
    size_t size1() const { return nrows; }
    size_t size2() const { return ncols; }
};

inline double rand_double(double min=0.0, double max=1.0)
{
    return (max - min) * (static_cast<double>(rand()) / RAND_MAX) + min;
}

int main() {
    // seed random number generator
    srand( static_cast<unsigned int>(time(NULL)) );

    // intialize data
    const int m = 1000, n = 40000;
    Matrix<double> A(m,n);
    Vector<double> B(m);
    for(size_t i=0; i<A.size1(); i++) {
        B(i) = rand_double();
        for(size_t j=0; j<A.size2(); j++) {
            A(i,j) = rand_double();
        }
    }

    // measure timing
    Timer timer;
    timer.start();

    // in MATLAB: count = sum(bsxfun(@ne, A, B))
    Vector<double> count(n);
    #pragma omp parallel for
    for(int j=0; j<n; ++j) {
        count(j) = 0.0;
        for(int i=0; i<m; i++) {
            count(j) += (A(i,j) != B(i));
        }
    }

    timer.stop();

    // elapsed time in milliseconds
    std::cout << "Elapsed time is " << timer.elapsedTime() << " milliseconds." << std::endl;

    return 0;
}

Резултатът:

$ g++ -Wall -O3 test.cpp -o test
$ ./test
Elapsed time is 63 milliseconds.

Ако компилирам и го стартирам с активирана поддръжка на OpenMP, получавам:

$ g++ -Wall -O3 -fopenmp test.cpp -o test_omp
$ ./test_omp
Elapsed time is 16 milliseconds.

Не е лошо подобрение (почти x4 по-бързо) само чрез добавяне на един ред към кода (макросът pargma omp).

Това последното бие 37 ms, които получавам в MATLAB (R2013b). Кодът е компилиран с помощта на GCC 4.8.1 (MinGW-w64, работещ на Windows 8, лаптоп Core i7).


Ако наистина искате да надхвърлите ограниченията тук за C++ кода, ще трябва да добавите векторизация (SSE/AVX intrinsics) в допълнение към многонишковостта, постигната с OpenMP.

Може също да обмислите използването на програмиране на GPGPU (CUDA, OpenCL). В MATLAB това е много лесно да се направи:

AA = gpuArray(A);
BB = gpuArray(B);
CC = sum(bsxfun(@ne, AA, BB));
C = gather(CC);

gpuArray(.) ще прехвърли матрицата към GPU, след което всички операции, извършени върху нея, се извършват на GPU устройството вместо на CPU. gather(.) ще прехвърли масива обратно в работното пространство на MATLAB. Проблемът тук обаче до голяма степен е обвързан с паметта, така че вероятно няма да видите подобрение (вероятно дори по-бавно поради претоварването на прехвърлянето на данни).

person Amro    schedule 18.01.2014