Запуск потоков с асинхронным

В последнем разделе мы увидели, как данные могут быть переданы из рабочего потока в родительский поток с использованием промисов и фьючерсов. Недостатком подхода promise-future, однако, является то, что он очень громоздкий (и включает в себя много стандартного кода) для передачи обещания функции потока с использованием ссылки rvalue и std::move. Однако для прямой задачи возврата данных или исключений из рабочего потока в родительский поток существует более простой и удобный способ использования std::async() вместо std::thread().

Давайте адаптируем пример кода из последнего раздела для использования std::async:

#include <iostream>
#include <thread>
#include <future>
#include <cmath>
#include <vector>
#include <chrono>

void workerFunction(int n)
{
    // print system id of worker thread
    std::cout << "Worker thread id = " << std::this_thread::get_id() << std::endl;
    // perform work
    for (int i = 0; i < n; ++i)
    {
        sqrt(12345.6789);
    }
}
int main()
{
    // print system id of worker thread
    std::cout << "Main thread id = " << std::this_thread::get_id() << std::endl;
    // start time measurement
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    
    // launch various tasks
    std::vector<std::future<void>> futures;
    int nLoops = 10, nThreads = 5;
    for (int i = 0; i < nThreads; ++i)
    {
        futures.emplace_back(std::async(workerFunction, nLoops));
    }
    // wait for tasks to complete
    for (const std::future<void> &ftr : futures)
        ftr.wait();
    // stop time measurement and print execution time
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
    std::cout << "Execution finished after " << duration <<" microseconds" << std::endl;
    
    return 0;
}

Первое изменение, которое мы вносим в функцию потока: мы удаляем обещание из списка аргументов, а также из блока try-catch. Кроме того, тип возвращаемого значения функции изменен с void на double, поскольку результат вычислений будет направлен обратно в основной поток с использованием простого возврата. После этих изменений функция ничего не знает ни о потоках, ни о фьючерсах, ни промисах — это простая функция, которая принимает два значения типа double в качестве аргументов и возвращает значение типа double в качестве результата. Кроме того, он выдаст исключение при попытке деления на ноль.

В основном потоке нам нужно заменить вызов std::thread на std::async. Обратите внимание, что async возвращает будущее, которое мы будем использовать позже в коде для получения значения, возвращаемого функцией. Промис, как и в случае с std::thread, больше не нужен, поэтому код становится намного короче. В блоке try-catch ничего не изменилось — мы по-прежнему вызываем get() для будущего в блоке try, а обработка исключений происходит без изменений в блоке catch. Кроме того, нам больше не нужно вызывать join(). С async деструктор потока будет вызываться автоматически, что снижает риск ошибки параллелизма.

Одно из основных различий между std::thread и std::async заключается в том, что в последнем случае система решает, следует ли запускать связанную функцию асинхронно или синхронно. Настраивая параметры запуска std::async вручную, мы можем напрямую влиять на то, будет ли связанная функция потока выполняться синхронно или асинхронно.

Параллелизм на основе задач

Определение оптимального количества потоков для использования является сложной задачей. Обычно от количества доступных ядер зависит, имеет ли смысл выполнять код как поток или последовательно. Использование std::async (и, следовательно, задач) снимает бремя этого решения с пользователя и позволяет системе решать, выполнять ли код последовательно или в виде потока. С задачами программист решает, что МОЖЕТ выполняться параллельно в принципе, а затем система во время выполнения решает, что БУДЕТ выполняться параллельно.

Внутренне это достигается за счет использования пулов потоков, которые представляют количество доступных потоков на основе ядер/процессоров, а также за счет использования очередей перехвата работы, где задачи динамически перераспределяются между доступными процессорами. На следующей диаграмме показан принцип распределения задач в многоядерной системе с использованием очередей перехвата работы.

Как видно, первое ядро ​​в примере сильно перегружено несколькими задачами, ожидающими выполнения. Однако остальные ядра работают вхолостую. Идея очереди перехвата работы состоит в том, чтобы в фоновом режиме работала сторожевая программа, которая регулярно отслеживает объем работы, выполняемой каждым процессором, и перераспределяет ее по мере необходимости. Для приведенного выше примера это будет означать, что задачи, ожидающие выполнения на первом ядре, будут перемещены (или «украдены») с занятых ядер и добавлены к доступным свободным ядрам, чтобы сократить время простоя. После этой процедуры перестановки распределение задач в нашем примере может выглядеть так, как показано на следующей диаграмме.

распределение работы таким образом может работать только тогда, когда параллелизм явно описан в программе программистом. Если это не так, кража работы не будет эффективной.

В завершение этого раздела приведено общее сравнение программирования на основе задач и программирования на основе потоков:

С задачами система заботится о многих деталях (например, о присоединении). В случае с потоками программист отвечает за многие детали. Что касается ресурсов, потоки обычно имеют больший вес, поскольку они генерируются операционной системой (ОС). Требуется время, чтобы ОС вызвалась и выделила память/стек/структуры данных ядра для потока. Кроме того, уничтожение нити дорого обходится. Задачи, с другой стороны, более легкие, поскольку они будут использовать пул уже созданных потоков («пул потоков»).

Потоки и задачи используются для решения разных задач. Потоки имеют больше общего с задержкой. Когда у вас есть функции, которые могут блокироваться (например, ввод файла, подключение к серверу), потоки могут избежать блокировки программы, когда, например. сервер ждет ответа. С другой стороны, задачи сосредоточены на пропускной способности, когда многие операции выполняются параллельно.

#include <iostream>
#include <thread>
#include <future>
#include <cmath>
#include <memory>

double divideByNumber(double num, double denom)
{
    // print system id of worker thread
    std::cout << "Worker thread id = " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(500)); // simulate work
    if (denom == 0)
        throw std::runtime_error("Exception from thread#: Division by zero!");
    return num / denom;
}

int main()
{
    // print system id of worker thread
    std::cout << "Main thread id = " << std::this_thread::get_id() << std::endl;
    // use async to start a task
    double num = 42.0, denom = 2.0;
    std::future<double> ftr = std::async(std::launch::deferred, divideByNumber, num, denom);
    // retrieve result within try-catch-block
    try
    {
        double result = ftr.get();
        std::cout << "Result = " << result << std::endl;
    }
    catch (std::runtime_error e)
    {
        std::cout << e.what() << std::endl;
    }
    return 0;
}

Запуск потоков и управление ими занимает значительное количество времени. Таким образом, параллельное выполнение вычислений не является общим преимуществом: необходимо тщательно взвесить с точки зрения вычислительных усилий, имеет ли смысл распараллеливание.