параллельная асинхронная_запись. есть ли решение без ожидания?

async_write() запрещено вызывать одновременно из разных потоков. Он отправляет данные порциями, используя async_write_some, и такие порции можно чередовать. Таким образом, пользователь должен позаботиться о том, чтобы не вызывать async_write() одновременно.

Есть ли лучшее решение, чем этот псевдокод?

void send(shared_ptr<char> p) {
  boost::mutex::scoped_lock lock(m_write_mutex);
  async_write(p, handler);
}

Мне не нравится идея блокировать другие потоки на довольно долгое время (в моем приложении ~ 50Mb отправки).

Может что-то подобное сработает?

void handler(const boost::system::error_code& e) {
  if(!e) {
    bool empty = lockfree_pop_front(m_queue);
    if(!empty) {
      shared_ptr<char> p = lockfree_queue_get_first(m_queue);
      async_write(p, handler);
    }
  }
}

void send(shared_ptr<char> p) {
  bool q_was_empty = lockfree_queue_push_back(m_queue, p)
  if(q_was_empty)
    async_write(p, handler);
}

Я бы предпочел найти готовый рецепт из поваренной книги. Разобраться с lock-free непросто, может появиться много малозаметных багов.


person user222202    schedule 07.05.2011    source источник
comment
Предполагая, что передача выполняется максимально быстро, в чем преимущество чередования данных из нескольких потоков? Это не ускорит общую скорость передачи, и потоки в среднем не будут заканчиваться быстрее.   -  person Bo Persson    schedule 07.05.2011
comment
весь буфер, переданный в async_write(), должен быть отправлен как непрерывный блок. он имеет некоторую структуру. представьте что-то вроде http-ответа с заголовком и содержимым файла. если есть параллельные async_write(), структура будет нарушена.   -  person user222202    schedule 07.05.2011
comment
Из документации boost: эта операция реализуется с точки зрения нулевого или более вызовов функции потока async_write_some и известна как составная операция. Программа должна гарантировать, что поток не выполняет никаких других операций записи (таких как async_write, функция потока async_write_some или любые другие составные операции, выполняющие запись), пока эта операция не завершится.   -  person user222202    schedule 07.05.2011


Ответы (2)


async_write() запрещено вызывать одновременно из разных потоков

Это утверждение не совсем верно. Приложения могут свободно вызывать async_write одновременно, если они находятся на разных объектах socket.

Есть ли лучшее решение, чем этот псевдокод?

void send(shared_ptr<char> p) {
  boost::mutex::scoped_lock lock(m_write_mutex);
  async_write(p, handler);
}

Это, вероятно, не выполняет то, что вы намеревались, поскольку async_write возвращается немедленно. Если вы хотите, чтобы мьютекс был заблокирован на все время операции записи, вам нужно будет держать scoped_lock в области действия до тех пор, пока не будет вызван обработчик завершения.

Есть более удобные решения этой проблемы, библиотека имеет встроенную поддержку, использующую концепцию strand. Он прекрасно вписывается в этот сценарий.

Цепочка определяется как строго последовательный вызов обработчиков событий (т. е. без параллельного вызова). Использование потоков позволяет выполнять код в многопоточной программе без необходимости явной блокировки (например, с использованием мьютексов).

Использование явного потока здесь гарантирует, что ваши обработчики вызываются только одним потоком, который вызвал io_service::run(). В вашем примере элемент m_queue будет защищен цепочкой, что обеспечит атомарный доступ к очереди исходящих сообщений. После добавления записи в очередь, если ее размер равен 1, это означает, что не выполняется незавершенная операция async_write, и приложение может инициировать операцию, обернутую через цепь. Если размер очереди больше 1, приложение должно дождаться завершения async_write. В обработчике завершения async_write удалите запись из очереди и при необходимости обработайте любые ошибки. Если очередь не пуста, обработчик завершения должен инициировать еще один async_write из начала очереди.

Это гораздо более чистый дизайн, который разбрызгивает мьютексы в ваших классах, поскольку он использует встроенные конструкции Asio по назначению. Этот другой ответ, который я написал, содержит некоторый код, реализующий этот дизайн.

person Sam Miller    schedule 09.05.2011
comment
вы уверены насчет strand? Я считаю, что вопрос заключается в том, как асинхронно отправлять пакеты без чередования. Во что я верю, и у меня такая же проблема, у него есть один поток, вызывающий io_service::run(), и какой-то другой поток, который хочет написать какой-то ответ. Например, 2 других потока выдают async_write() для одного и того же объекта socket, порядок фактических вызовов async_write_some() не определен. Только strand гарантирует, что будут сериализованы только обработчики завершения этих операций записи, что так и есть, потому что в любом случае только один поток вызвал io_service::run(). Комментарии приветствуются. - person Dragomir Ivanov; 28.11.2012
comment
@DragomirIvanov да, эта цепочка необходима для обеспечения доступа к очереди исходящих сообщений, используемой для сериализации вызовов async_write. Я обновил свой ответ, чтобы уточнить. - person Sam Miller; 18.11.2013

Мы решили эту проблему, создав отдельную очередь данных для записи в нашем объекте сокета. Когда первый фрагмент данных для записи поставлен в очередь, мы запускаем async_write(). В обработчике завершения нашего async_write мы запускаем последующие async_write операции, если есть еще данные для передачи.

person Chad    schedule 19.05.2011
comment
Этот. Использование нитей, предложенное Сэмом Миллером, может гарантировать, что вы не получите одновременный доступ к очереди отправки, что означает отсутствие необходимости в мьютексах. Таким образом, это решение будет работать без ожидания в соответствии с запросом, если оно реализовано правильно. - person ComicSansMS; 06.09.2013