Стартиране на нишки с async

В последния раздел видяхме как данните могат да се предават от работна нишка към родителската нишка с помощта на обещания и фючърси. Недостатък на подхода обещание-бъдеще обаче е, че е много тромаво (и включва много шаблонен код) да се предаде обещанието към функцията на нишката, използвайки препратка към rvalue и std::move. За простата задача за връщане на данни или изключения от работна нишка към родителската нишка обаче има по-прост и по-удобен начин с използване на std::async() вместо std::thread().

Нека адаптираме примерния код от последния раздел, за да използваме std::async:

#include <iostream>
#include <thread>
#include <future>
#include <cmath>
#include <vector>
#include <chrono>

void workerFunction(int n)
{
    // print system id of worker thread
    std::cout << "Worker thread id = " << std::this_thread::get_id() << std::endl;
    // perform work
    for (int i = 0; i < n; ++i)
    {
        sqrt(12345.6789);
    }
}
int main()
{
    // print system id of worker thread
    std::cout << "Main thread id = " << std::this_thread::get_id() << std::endl;
    // start time measurement
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    
    // launch various tasks
    std::vector<std::future<void>> futures;
    int nLoops = 10, nThreads = 5;
    for (int i = 0; i < nThreads; ++i)
    {
        futures.emplace_back(std::async(workerFunction, nLoops));
    }
    // wait for tasks to complete
    for (const std::future<void> &ftr : futures)
        ftr.wait();
    // stop time measurement and print execution time
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
    std::cout << "Execution finished after " << duration <<" microseconds" << std::endl;
    
    return 0;
}

Първата промяна, която правим, е във функцията на нишката: премахваме обещанието от списъка с аргументи, както и блока try-catch. Освен това типът връщане на функцията се променя от void на double, тъй като резултатът от изчислението ще бъде насочен обратно към основната нишка с помощта на просто връщане. След тези промени функцията няма познания за нишки, нито за фючърси или обещания — това е проста функция, която приема две двойни като аргументи и връща двойна като резултат. Освен това ще хвърли изключение при опит за деление на нула.

В основната нишка трябва да заменим извикването на std::thread с std::async. Имайте предвид, че async връща бъдеще, което ще използваме по-късно в кода, за да извлечем стойността, върната от функцията. Обещание, както при std::thread, вече не е необходимо, така че кодът става много по-кратък. В блока try-catch нищо не се е променило - ние все още извикваме get() за бъдещето в блока try-catch и обработката на изключения се случва непроменена в блока catch. Освен това вече не е необходимо да се обаждаме на join(). С async, деструкторът на нишки ще бъде извикан автоматично - което намалява риска от грешка в паралелността.

Една от основните разлики между std::thread и std::async е, че с последния системата решава дали свързаната функция трябва да се изпълнява асинхронно или синхронно. Чрез ръчно регулиране на параметрите за стартиране на std::async можем директно да повлияем дали свързаната функция на нишката ще се изпълнява синхронно или асинхронно.

Едновременност, базирана на задачи

Определянето на оптималния брой нишки за използване е труден проблем. Обикновено зависи от броя на наличните ядра дали има смисъл да се изпълнява код като нишка или по последователен начин. Използването на std::async (и следователно на задачите) премахва тежестта на това решение от потребителя и позволява на системата да реши дали да изпълни кода последователно или като нишка. Със задачите програмистът решава какво МОЖЕ да се изпълнява паралелно по принцип и след това системата решава по време на изпълнение какво ЩЕ се изпълнява паралелно.

Вътрешно това се постига чрез използване на пулове от нишки, които представляват броя на наличните нишки въз основа на ядрата/процесорите, както и чрез използване на опашки за кражба на работа, където задачите се преразпределят динамично между наличните процесори. Следната диаграма показва принципа на разпределение на задачи в многоядрена система, използвайки опашки за кражба на работа.

Както може да се види, първото ядро ​​в примера е силно пренатоварено с няколко задачи, които чакат да бъдат изпълнени. Другите ядра обаче работят на празен ход. Идеята на опашката за кражба на работа е да има програма за наблюдение, работеща във фонов режим, която редовно следи количеството работа, извършена от всеки процесор, и я преразпределя според нуждите. За горния пример това би означавало, че задачите, чакащи за изпълнение на първото ядро, ще бъдат преместени (или „откраднати“) от заетите ядра и добавени към наличните свободни ядра, така че времето на престой да бъде намалено. След тази процедура за пренареждане разпределението на задачите в нашия пример може да изглежда както е показано на следващата диаграма.

разпределението на работата по този начин може да работи само когато паралелизмът е изрично описан в програмата от програмиста. Ако това не е така, кражбата на работа няма да се представи ефективно.

В заключение на този раздел е дадено общо сравнение на програмиране, базирано на задачи и базирано на нишки:

При задачите системата се грижи за много подробности (напр. присъединяване). При нишките програмистът е отговорен за много подробности. Що се отнася до ресурсите, нишките обикновено са по-тежки, тъй като се генерират от операционната система (ОС). Отнема време, за да бъде извикана ОС и да разпредели памет / стек / структури от данни на ядрото за нишката. Освен това унищожаването на нишката е скъпо. Задачите от друга страна са по-леки, тъй като ще използват набор от вече създадени нишки („пулът от нишки“).

Нишките и задачите се използват за различни проблеми. Нишките имат повече общо със закъснението. Когато имате функции, които могат да блокират (напр. въвеждане на файл, връзка със сървъра), нишките могат да избегнат блокирането на програмата, когато напр. сървърът чака отговор. Задачите от друга страна се фокусират върху пропускателната способност, където много операции се изпълняват паралелно.

#include <iostream>
#include <thread>
#include <future>
#include <cmath>
#include <memory>

double divideByNumber(double num, double denom)
{
    // print system id of worker thread
    std::cout << "Worker thread id = " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(500)); // simulate work
    if (denom == 0)
        throw std::runtime_error("Exception from thread#: Division by zero!");
    return num / denom;
}

int main()
{
    // print system id of worker thread
    std::cout << "Main thread id = " << std::this_thread::get_id() << std::endl;
    // use async to start a task
    double num = 42.0, denom = 2.0;
    std::future<double> ftr = std::async(std::launch::deferred, divideByNumber, num, denom);
    // retrieve result within try-catch-block
    try
    {
        double result = ftr.get();
        std::cout << "Result = " << result << std::endl;
    }
    catch (std::runtime_error e)
    {
        std::cout << e.what() << std::endl;
    }
    return 0;
}

Стартирането и управлението на нишки отнема значително време. Следователно не е общо предимство, ако изчисленията се извършват паралелно: Трябва внимателно да се претегли по отношение на изчислителните усилия дали паралелизирането има смисъл.