epoll IO с рабочими потоками в C

Я пишу небольшой сервер, который будет получать данные из нескольких источников и обрабатывать эти данные. Источники и полученные данные значительны, но не более чем epoll должен уметь справляться достаточно хорошо. Однако все полученные данные необходимо проанализировать и запустить через большое количество тестов, что отнимает много времени и блокирует один поток, несмотря на мультиплексирование epoll. По сути, шаблон должен быть примерно таким: цикл ввода-вывода получает данные и объединяет их в задание, отправляет их первому потоку, доступному в пуле, пакет обрабатывается заданием, и результат передается пакетом в цикл ввода-вывода для запись в файл.

Я решил использовать один поток ввода-вывода и N рабочих потоков. Поток ввода-вывода для приема TCP-соединений и чтения данных легко реализовать, используя пример, представленный по адресу: http://linux.die.net/man/7/epoll

С потоками также обычно достаточно легко справиться, но я изо всех сил пытаюсь элегантно объединить цикл ввода-вывода epoll с пулом потоков. Я также не могу найти никаких "лучших практик" для использования epoll с рабочим пулом в Интернете, но есть довольно много вопросов по той же теме.

Поэтому у меня есть вопрос, и я надеюсь, что кто-нибудь поможет мне ответить:

  1. Можно (и нужно) использовать eventfd как механизм для двусторонней синхронизации между потоком ввода-вывода и всеми рабочими? Например, неплохо ли для каждого рабочего потока иметь свою собственную процедуру epoll, ожидающую совместного использования eventfd (с указателем структуры, содержащим данные / информацию о задании), то есть каким-то образом использовать eventfd в качестве очереди заданий? Также, возможно, есть еще один eventfd для передачи результатов обратно в поток ввода-вывода из нескольких рабочих потоков?
  2. После того, как поток ввода-вывода получает сигнал о большем количестве данных в сокете, должно ли происходить фактическое получение данных в потоке ввода-вывода или рабочий должен получать данные самостоятельно, чтобы не блокировать поток ввода-вывода при анализе фреймов данных и т. Д.? В таком случае, как я могу обеспечить безопасность, например в случае, если recv читает 1,5 кадра данных в рабочем потоке, а другой рабочий поток получает последние 0,5 кадра данных из того же соединения?
  3. Если пул рабочих потоков реализован через мьютексы и тому подобное, будет ли ожидание блокировок блокировать поток ввода-вывода, если N + 1 потоки пытаются использовать одну и ту же блокировку?
  4. Существуют ли какие-либо образцы хорошей практики для создания пула рабочих потоков вокруг epoll с двусторонней связью (т.е. как от ввода-вывода к рабочим, так и обратно)?

РЕДАКТИРОВАТЬ: одним из возможных решений может быть обновление кольцевого буфера из цикла ввода-вывода, после обновления отправить индекс кольцевого буфера рабочим через общий канал для всех рабочих (тем самым передав контроль над этим индексом первому рабочему, который читает index от канала), пусть рабочий владеет этим индексом до конца обработки, а затем снова отправит номер индекса обратно в поток ввода-вывода через канал, тем самым вернув управление?

Мое приложение предназначено только для Linux, поэтому я могу использовать функции только для Linux, чтобы добиться этого наиболее элегантным способом. Кросс-платформенная поддержка не нужна, но требуется производительность и безопасность потоков.


person agnsaft    schedule 19.02.2014    source источник
comment
Я думаю, что у меня может быть полезное решение, но сначала нужно знать, как скоро вы узнаете длину одного кадра / пакета? Они фиксированной длины, включены ли они в заголовок пакета или вы знаете только в конце? Если вы знаете, что раньше, гораздо проще отказаться от работы, не загружая основной поток, но если вы не знаете до конца, основной поток неизбежно должен много читать.   -  person Vality    schedule 24.02.2014
comment
Привет, я знаю длину после recv и после итерации через буфер recv. К сожалению, они не фиксированной длины, и длина не отображается в пакете, а основана на кадрировании перевода строки.   -  person agnsaft    schedule 25.02.2014


Ответы (3)


При выполнении этой модели, поскольку мы знаем размер пакета только после того, как мы полностью получили пакет, к сожалению, мы не можем выгрузить само получение в рабочий поток. Вместо этого лучшее, что мы можем сделать, - это поток для получения данных, который должен будет передавать указатели на полностью полученные пакеты.

Сами данные, вероятно, лучше всего хранить в кольцевом буфере, однако нам понадобится отдельный буфер для каждого источника ввода (если мы получим частичный пакет, мы можем продолжить получение из других источников, не разделяя данные. Остается вопрос, как информировать когда новый пакет готов, и дать им указатель на данные в указанном пакете. Поскольку данных здесь мало, всего несколько указателей, наиболее элегантным способом сделать это будет с помощью очередей сообщений posix. Они обеспечивают возможность для нескольких отправителей и нескольких получателей писать и читать сообщения, всегда гарантируя получение каждого сообщения ровно 1 потоком.

Вам понадобится структура, подобная приведенной ниже, для каждого источника данных, сейчас я перейду к назначению полей.

struct DataSource
{
    int SourceFD;
    char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
    char *LatestPacket;
    char *CurrentLocation
    int SizeLeft;
};

SourceFD, очевидно, является файловым дескриптором рассматриваемого потока данных, DataBuffer - это место, где содержимое пакетов хранится во время обработки, это кольцевой буфер. Указатель LatestPacket используется для временного хранения указателя на наиболее часто отправленный пакет на случай, если мы получим частичный пакет и перейдем к другому источнику, прежде чем передать пакет. CurrentLocation хранит, где заканчивается последний пакет, чтобы мы знали, где разместить следующий или куда продолжить в случае частичного приема. Оставшийся размер - это место, оставшееся в буфере, он будет использоваться, чтобы определить, можем ли мы уместить пакет или нужно вернуться к началу.

Таким образом, функция приема будет эффективно

  • Скопируйте содержимое пакета в буфер
  • Переместите CurrentLocation, чтобы указать на конец пакета
  • Обновите SizeLeft, чтобы учесть теперь уменьшенный буфер
  • Если мы не можем поместить пакет в конец буфера, мы выполняем цикл
  • Если и там нет места, мы пробуем еще раз чуть позже, тем временем переходя к другому источнику.
  • Если бы у нас было частичное получение, сохраните указатель LatestPacket, чтобы указать на начало пакета и перейти к другому потоку, пока мы не получим остальные.
  • Отправьте сообщение с помощью очереди сообщений posix в рабочий поток, чтобы он мог обработать данные, сообщение будет содержать указатель на структуру DataSource, чтобы он мог работать с ней, ему также нужен указатель на пакет, над которым он работает, и его размер, их можно вычислить, когда мы получим пакет

Рабочий поток будет выполнять свою обработку, используя полученные указатели, а затем увеличивать SizeLeft, чтобы поток-получатель знал, что он может продолжить заполнение буфера. Атомарные функции потребуются для работы со значением размера в структуре, поэтому мы не получаем условий гонки с помощью свойства size (поскольку возможно, что он записывается одновременно рабочим и потоком ввода-вывода, что приводит к потерям записи, см. комментарий ниже), они перечислены здесь и просты и чрезвычайно полезны.

Теперь я дал некоторые общие сведения, но остановлюсь на конкретных моментах:

  1. Использование EventFD в качестве механизма синхронизации - в значительной степени плохая идея, вы обнаружите, что используете изрядное количество ненужного процессорного времени, и очень сложно выполнить какую-либо синхронизацию. В частности, если у вас есть несколько потоков, которые используют один и тот же дескриптор файла, у вас могут возникнуть серьезные проблемы. По сути, это неприятный прием, который иногда работает, но не может заменить правильную синхронизацию.
  2. Также плохая идея - попытаться разгрузить прием, как описано выше, вы можете обойти проблему со сложным IPC, но, честно говоря, маловероятно, что получение ввода-вывода займет достаточно времени, чтобы остановить ваше приложение, ваш ввод-вывод также, вероятно, намного медленнее, чем процессор поэтому получение с несколькими потоками мало что даст. (это предполагает, что вы не говорите, что имеете несколько 10-гигабитных сетевых карт).
  3. Использование мьютексов или блокировок здесь - глупая идея, она намного лучше подходит для кодирования без блокировки, учитывая небольшой объем (одновременно) общих данных, вы действительно просто передаете работу и данные. Это также повысит производительность принимающего потока и сделает ваше приложение более масштабируемым. Использование упомянутых здесь функций http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html, вы можете сделать это легко и просто. Если вы сделали это таким образом, вам понадобится семафор, его можно разблокировать каждый раз, когда пакет будет получен и заблокирован каждым потоком, который запускает задание, чтобы динамически разрешить больше потоков, если будет готово больше пакетов, что будет иметь гораздо меньше накладных расходов, чем решение homebrew с мьютексами.
  4. Здесь нет особой разницы с любым пулом потоков, вы создаете много потоков, а затем все они блокируются в mq_receive в очереди сообщений данных для ожидания сообщений. По завершении они отправляют свой результат обратно в основной поток, который добавляет очередь сообщений с результатами в свой список epoll. Затем он может получать результаты таким образом, это просто и очень эффективно для небольших полезных данных, таких как указатели. Это также будет использовать мало ЦП и не заставит основной поток тратить время на управление рабочими.

Наконец, ваше редактирование довольно разумно, за исключением того факта, что, как я уже сказал, очереди сообщений здесь намного лучше, чем каналы, поскольку они очень эффективно сигнализируют о событиях, гарантируют полное чтение сообщения и обеспечивают автоматическое формирование кадра.

Надеюсь, это поможет, но уже поздно, поэтому, если я что-то пропустил или у вас есть вопросы, не стесняйтесь комментировать, чтобы уточнить или получить дополнительные объяснения.

person Vality    schedule 25.02.2014
comment
Спасибо за длинный ответ. Всего несколько вопросов: могу ли я предположить, что несколько потоков могут блокироваться в одной очереди, чтобы дождаться новых задач? Могут ли также несколько потоков писать в другую очередь для передачи завершенной работы обратно? В таком дизайне действительно нужны встроенные функции, описанные выше? - person agnsaft; 26.02.2014
comment
@invictus Очереди сообщений действительно являются отношениями «многие ко многим», они чрезвычайно мощны, поскольку любое количество отправителей и любое количество получателей могут использовать очередь, и сообщения всегда будут передаваться точно в один из потоков прослушивания. Вышеупомянутые встроенные функции не нужны для подавляющего большинства кода, единственное их использование - гарантировать, что SizeLeft обновляется атомарно, чтобы поток-получатель и рабочие потоки не обновляли его одновременно и не приводили к его повреждению, например: thread1 загружает value, thread2 загружает значение, thread1 записывает его, thread2 записывает, запись thread1 теряется. - person Vality; 26.02.2014
comment
Похоже, это решение, которое я тогда искал. Вы знаете, как работает mq_ * (т.е. накладные расходы на производительность)? - person agnsaft; 26.02.2014
comment
@invictus Хотя производительность зависит от вашего конкретного приложения и конкретной версии ядра, вы можете ожидать, что они обеспечат гораздо более высокую производительность, чем что-то вроде сокета, но немного медленнее, чем заказная система очереди с общей памятью, однако они полностью потокобезопасны и сэкономит вам много времени на проектирование. Что еще более важно, единственный раз, когда они становятся медленными, - это очень большие сообщения (поскольку сообщение хранится внутри дважды, один раз в памяти отправителя и один раз в памяти получателя), поэтому, если вы передаете только указатели и небольшие сообщения, накладные расходы незначительны. - person Vality; 26.02.2014
comment
Кроме того, я забыл упомянуть, что очереди сообщений posix гарантированно никогда не блокируют отправителя при попытке отправки, передача данных выполняется асинхронно. Это означает, что никакие накладные расходы никогда не замедлят поток ввода-вывода. - person Vality; 26.02.2014
comment
Я понимаю. Я предполагаю, что для меня не может быть и речи о системе очереди с общей памятью, поскольку я хочу, чтобы результаты возвращались в основной цикл ввода-вывода, и мне нужен какой-то способ ожидания событий как от рабочего потока, так и от FD ввода-вывода. Мне все еще кажется, что ваше предложение - это верный путь :) - person agnsaft; 26.02.2014
comment
Я думаю, пайп также может работать, если кто-то хочет сделать более портальное решение? - person agnsaft; 26.02.2014
comment
@invictus К сожалению, канал не является потокобезопасным в этом смысле, у вас нет никаких гарантий относительно кадрирования. Очередь сообщений гарантирует, что 1 сообщение целиком достигнет одного потока, с конвейером, если сообщение превышает 1 байт, оно может чередоваться неопределенным образом, и несколько потоков могут каждый читать часть пакета, создавая то, что, вероятно, будет немедленным. segfault, когда вы затем пытаетесь разыменовать размер как указатель. Очереди сообщений на самом деле очень портативны и поддерживаются практически в каждой системе Posix. - person Vality; 26.02.2014

В моих тестах один экземпляр epoll на поток намного превосходил сложные модели потоковой передачи. Если сокеты слушателя добавлены ко всем экземплярам epoll, рабочие будут просто accept(2), а победитель получит соединение и обработает его в течение всего срока его службы.

Ваши рабочие могут выглядеть примерно так:

for (;;) {
    nfds = epoll_wait(worker->efd, &evs, 1024, -1);

    for (i = 0; i < nfds; i++)
        ((struct socket_context*)evs[i].data.ptr)->handler(
            evs[i].data.ptr,
            evs[i].events);
}

И каждый дескриптор файла, добавленный к экземпляру epoll, может иметь struct socket_context связанный с ним:

void listener_handler(struct socket_context* ctx, int ev)
{
    struct socket_context* conn;

    conn->fd = accept(ctx->fd, NULL, NULL);
    conn->handler = conn_handler;

    /* add to calling worker's epoll instance or implement some form
     * of load balancing */
}

void conn_handler(struct socket_context* ctx, int ev)
{
    /* read all available data and process. if incomplete, stash
     * data in ctx and continue next time handler is called */
}

void dummy_handler(struct socket_context* ctx, int ev)
{
    /* handle exit condition async by adding a pipe with its
     * own handler */
}

Мне нравится эта стратегия, потому что:

  • очень простой дизайн;
  • все нити идентичны;
  • рабочие и связи изолированы - нельзя наступать друг другу на пятки или звонить read(2) не тому работнику;
  • никаких блокировок не требуется (ядро заботится о синхронизации на accept(2));
  • в некоторой степени естественная балансировка нагрузки, поскольку ни один занятый работник не будет активно бороться за accept(2).

И несколько замечаний по epoll:

  • использовать режим запуска по фронту, неблокирующие сокеты и всегда читать до EAGAIN;
  • избегайте dup(2) семейства вызовов, чтобы избавить себя от некоторых сюрпризов (epoll регистрирует дескрипторы файла, но фактически отслеживает описания файлов);
  • вы можете безопасно epoll_ctl(2) экземпляры epoll других потоков;
  • используйте большой struct epoll_event буфер для epoll_wait(2), чтобы избежать голода.

Некоторые другие примечания:

  • используйте accept4(2), чтобы сохранить системный вызов;
  • используйте один поток на ядро ​​(1 для каждого физического, если привязан к ЦП, или 1 для каждого логического, если привязан к вводу-выводу);
  • _14 _ / _ 15_ будет быстрее, если количество подключений мало.

Надеюсь, это поможет.

person haste    schedule 20.02.2014
comment
Мне нравится эта идея, однако меня беспокоит, что моя большая рабочая нагрузка после каждого recv заблокирует другие соединения. Кроме того, не приведет ли это к несбалансированной рабочей нагрузке для каждого потока, если потоку посчастливится сначала выбрать следующее принятие? Более того, если у меня всего 4-5 соединений, мне все равно может понадобиться 30 рабочих потоков для обработки того, что они производят. - person agnsaft; 20.02.2014
comment
@invictus Да, рабочая нагрузка не будет идеально сбалансирована, если вы сами не распределите соединения равномерно в обработчике слушателя, что может добавить некоторую сложность. Ваша работа связана с процессором или вводом-выводом? Больше потоков, чем ядер процессора, просто вызовет большее переключение контекста, если оно связано с процессором. - person haste; 20.02.2014
comment
@pindumb Не будет проблемой для небольшого количества потоков (по одному на физическое ядро). Под нагрузкой есть вероятность, что только небольшая часть потоков будет засвидетельствована читаемым слушателем. Если это вызывает беспокойство, слушатель может переключаться между потоками. Это была бы другая история с сотнями или тысячами нитей. - person haste; 21.02.2014
comment
Я считаю, хотя это интересная идея, что этот конкретный дизайн не подходит, поскольку я считаю, что один поток должен иметь возможность обрабатывать все операции ввода-вывода, в то время как фактическая обработка сильно зависит от ЦП. Я добавлю награду через 14 часов, потому что я много лет искал хороший ответ на этот вопрос, и все, что я нашел, это то, что другие задаются тем же вопросом. Было бы неплохо получить ответ из учебника, который можно было бы найти в Google :) - person agnsaft; 21.02.2014
comment
@invictus Это круто. Я делал что-то подобное, но с работой, связанной с вводом-выводом. Я попробовал 1 поток слушателя + N рабочих с одиночными и отдельными рабочими очередями FIFO без блокировки. Что убивало производительность, так это подпрыгивание строки кэша и выяснение того, когда и как спать и будить рабочих. Изоляция была действительно ключом к 10-100-кратному ускорению. Однако совместное использование записываемых данных может быть незначительным в вашем случае, если обработка занимает много времени. Удачи. :) - person haste; 21.02.2014

Я отправляю тот же ответ в другом сообщении: Я хочу дождаться и дескриптора файла, и мьютекса. Каков рекомендуемый способ сделать это?

==========================================================

Это очень распространенная проблема, особенно когда вы разрабатываете сетевую серверную программу. Основной вид большинства серверных программ Linux будет выглядеть следующим образом:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

Это однопоточный (основной поток) серверный фреймворк на основе epoll. Проблема в том, что он однопоточный, а не многопоточный. Это требует, чтобы proc () никогда не блокировался или не запускался в течение значительного времени (скажем, 10 мс для общих случаев).

Если proc () когда-либо будет работать в течение длительного времени, НАМ НУЖНЫ НЕСКОЛЬКО ПОТОКОВ и выполнение proc () в отдельном потоке (рабочем потоке).

Мы можем отправить задачу рабочему потоку, не блокируя основной поток, используя очередь сообщений на основе мьютексов, это достаточно быстро.

Затем нам нужен способ получить результат задачи из рабочего потока. Как? Однако если мы просто проверим очередь сообщений напрямую, до или после epoll_wait (), действие проверки будет выполнено после завершения epoll_wait (), а epoll_wait () обычно блокируется на 10 микросекунд (общие случаи), если все файловые дескрипторы ожидают не активны.

Для сервера 10 мс - довольно много времени! Можем ли мы сигнализировать epoll_wait () о немедленном завершении работы, когда сгенерирован результат задачи?

Да! Я опишу, как это делается, в одном из моих проектов с открытым исходным кодом.

Создайте канал для всех рабочих потоков, и epoll также будет ожидать на этом конвейере. Как только результат задачи сгенерирован, рабочий поток записывает один байт в конвейер, после чего epoll_wait () завершится примерно за то же время! - Linux pipe имеет задержку от 5 до 20 мкс.

В моем проекте SSDB (дисковая база данных NoSQL, совместимая с протоколом Redis), я создаю SelectableQueue для передачи сообщения между основным потоком и рабочими потоками. Как и его название, SelectableQueue имеет файловый дескриптор, который может ждать epoll.

SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

Использование в основном потоке:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is worker_thread){
            sock, resp = worker->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            worker->add_task(fd, req);
        }
    }
}

Использование в рабочем потоке:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);
person ideawu    schedule 14.09.2017