nanoQ (сокращенно nQ) был создан для потоковой передачи данных в реальном времени: аудио, видео, клики, датчики - где потерять данные - это нормально, чтобы получить самую последнюю информацию. сообщения быстрее.

Основная цель - мгновенная публикация «запустил и забыл» с гарантией максимальной доставки и поддержкой множества (сотен, тысяч) одновременных потоков.

Если вы программист типа покажи мне код, без дополнительной оплаты github.com/aigent/nq

Обоснование

Мы твердо уверены, что хорошие программные продукты были созданы для решения одной конкретной проблемы (и только потом их раздувают и злоупотребляют не по назначению).

В Aigent мы используем nQ для передачи аудио и сигналов VoIP через нашу внутреннюю инфраструктуру. Существующие брокеры сообщений не слишком хорошо подходят для этого сценария. Если сервис хочет передать данные другому сервису, схема:

Pub → broker → Sub

создает дополнительную обратную связь с увеличенной задержкой и нагрузкой на оборудование.

В некоторых случаях центральный брокер может стать узким местом, особенно при смешанной рабочей нагрузке: быстрые и медленные клиенты, большие и маленькие сообщения, попытки масштабирования брокера могут привести к боли в шее.

Вместо этого издатели nQ подключаются напрямую к подписчикам.

    /→ Sub1
Pub  → Sub2
    \→ Sub3

Этот подход очень упрощен, но эффективен: вы можете создавать масштабируемые конвейеры произвольной сложности. Объедините это с Kubernetes или маршрутизацией сетей облачных сервисов и автоматическим масштабированием, и вы станете золотыми.

В приведенном выше сценарии служба Pub публикуется в Sub1, Sub2 и Sub3 одновременно, представьте, что службы Sub1 и Sub2 работают быстро, а Sub3 работает медленно или даже недоступно. Какие у нас есть варианты?

  1. Замедление обработки для всех подписчиков и создание противодавления.
  2. Буферизируйте входящие сообщения, пока Sub3 не сможет их успеть.
  3. Доставляйте сообщения в Sub1 и Sub2 в их темпе и отбрасывайте сообщения, направленные в Sub3 - это то, что делает nanoQ.

Это простая реализация постепенной деградации - общий конвейер продолжает работать, несмотря на то, что некоторые службы не работают или работают со сбоями, и все это с очень низкими затратами на обслуживание (вам не нужно отслеживать растущие очереди или преследовать самый медленный компонент).

В нашем случае это работает - мы все равно хотим, чтобы полученные данные были свежими. В Aigent мы отправляем подсказки агентам колл-центра в режиме реального времени во время разговора, и получить подсказку о том, о чем вы говорили несколько минут назад, бесполезно - это в лучшем случае раздражает.

Как работает nanoQ

Я бы хотел, чтобы в каждом программном проекте было описание внутренней кухни и дизайнерского решения в один-два абзаца вместо тысяч страниц маркетинговых материалов. Итак, поехали.

В основе nanoQ есть несколько впечатляющих неудач, испытаний и ошибок, которые в конечном итоге сделали это.

В двух словах

TCP против UDP

Вы можете подумать, что UDP имеет низкую задержку и конструктивно «запустил и забыл», поэтому это должен быть очевидный выбор. Против этого есть две причины: потеря пакетов и перегрузка.

UDP слишком часто отбрасывает пакет: слишком высокая скорость отправки, ЦП занят другим процессом, очередь ядра заполнена. Мы хотим отбрасывать пакеты в крайнем случае, когда приемник недоступен, но мы не хотим иметь дело с трещинами, шипением и отсутствующими фрагментами звука из-за того, что процессор на одной из машин был занят.

Еще труднее бороться с заторами. Представьте, что у вас есть сеть со скоростью 100 Мбит / с, и вы отправляете со скоростью 80 Мбит / с, потому что, эй, оборудование должно работать за деньги. Внезапно пропускная способность падает до 15 Мбит / с. Первый вопрос: откуда мы знаем, что он упал? Второй вопрос: что делать? Мы не можем продолжать отправлять с прежней скоростью 80 Мбит / с. В худшем случае у нас будет застойный коллапс, когда ссылка будет доступна, но из-за превышения входящего трафика ни один пакет не сможет достичь пункта назначения до истечения тайм-аута. Хуже всего то, что пропускная способность может вернуться к предыдущему значению 100 Мбит / с, но из-за существующей перегрузки пункты назначения могут казаться недоступными, или данные могут поступать слишком поврежденными, чтобы быть значимыми.

Сетевые протоколы, построенные на основе UDP, обычно имеют дополнительные протоколы или сообщения для управления полосой пропускания, предотвращения дрожания, управления QoS, повторных попыток при потере пакетов. Например, RTP - транспортный протокол реального времени имеет RTCP - протокол управления RTP.

Эти протоколы довольно сложные. Но потом - TCP делает все это, и часто очень эффективно на аппаратном уровне.

В нашем случае мы можем выдержать задержку в сотни миллисекунд, и мы хотим передавать тысячи потоков по одному и тому же каналу, поэтому мы позволяем TCP взять на себя всю тяжелую работу и использовать все существующие диагностические инструменты.

Мы используем TCP / IP, и каждый поток имеет собственное соединение для использования полосы пропускания.

Протокол связи

В протоколе есть только одно сообщение: передача данных, и это очень просто:

+--...---+---------...
|<length>| payload ...
+--...---+---------...

‹length› - это Варинт 1–5 октетов в зависимости от размера полезной нагрузки.

Варианты - это метод сериализации целых чисел с использованием одного или нескольких байтов. Меньшие числа занимают меньшее количество байтов.

Каждый байт в переменной, кроме последнего, имеет установленный старший бит (msb) - это указывает на то, что есть еще байты. Младшие 7 бит каждого байта используются для хранения двух дополнительных представлений числа в группах по 7 бит, сначала наименее значимая группа.

Таким образом, для полезной нагрузки размером 0–127 байт требуется только 1 дополнительный байт метаданных.

Мы используем nanoQ в существующей внутренней инфраструктуре, где уже есть проверка целостности, маршрутизация и шифрование. Этот минималистичный протокол легко реализовать практически на любом языке программирования.

API v0 и подробности реализации

В самом начале мы решили, что nanoQ не должен обращать внимания на данные: некоторые из наших сервисов используют JSON, некоторые - Protobuf или msgpack - все, что работает лучше всего.

API издателя:

Publish(ctx context.Context, payload []byte, streamKey interface{}) error

Итак, наша полезная нагрузка - это просто кусок байтов. streamKey определяет, использовать ли существующее соединение или создать новое - каждое новое streamKey является новым соединением.

API v0 нашего подписчика:

type IncomingMsg struct {
    Payload            []byte
    StreamDescriptor   interface{}
}
Subscribe(ctx context.Context) <-chan IncomingMsg

И использовать go-каналы в качестве внутренних очередей было несложно. Все время было chan [] byte.

Оказалось, что у этой конструкции есть несколько критических недостатков.

Начнем с издателя.

payload := []byte(“Example payload”)
// We store the message into the internal queue
Publish(ctx, payload, “example”)
payload[3] = 0 // Wait! What?! This is illegal!!!

Поскольку срез по сути является указателем, мы не можем просто отправить полезную нагрузку по каналу - пользователь все равно может изменить содержимое, а затем:

  1. это состояние гонки, и
  2. мы передаем испорченные данные

Итак, на этом этапе нам пришлось либо объяснить каждому пользователю, чтобы он не делал этого, пожалуйста, или выделять копию каждого входящего сообщения (угадайте, что мы выбрали, и напишите комментарий).

Со стороны подписчика у нас была та же проблема - API обещал, что каждое входящее сообщение будет доставляться в виде отдельного фрагмента [] byte, поэтому нам также пришлось выделить память для каждого входящего пакета.

Есть проблема с выделением? Идти не волшебно. Нет ничего волшебного. Наша работа как инженеров - бороться с магией и объяснять, как все работает на самом деле. По крайней мере, правдоподобное объяснение - никакой магии.

Go делает все возможное, чтобы убедить нас в том, что сборщика мусора и среды выполнения не существует (и это удивительно убедительно). Но когда вы выделяете здесь и там пару сотен тысяч раз в секунду, сборщики мусора внезапно захотят поговорить с вами.

О, боже, загрузка процессора была высокой, как змей.

Редизайн и текущий API

Столпом нового дизайна стал кольцевой буфер - мы использовали его для внутренних очередей вместо chan [] byte.

Так что мы можем читать и писать, используя одну и ту же заранее выделенную область памяти.

API издателя остался прежним:

Publish(ctx context.Context, payload []byte, streamKey interface{}) error

Мы просто копируем содержимое payload в кольцевой буфер уже в формате проводного протокола. Фоновый поток очистит буфер в зависимости от параметра FlushFrequency. Этот параметр и параметры NoDelay (отключение или включение алгоритма Нэгла) управляют задержкой по сравнению с «ползунком» полосы пропускания.

API подписчика стал более io.Reader -подобным:

Receive(ctx context.Context, p []byte) (payload []byte, stream StreamDescriptor, err error)

Полезная нагрузка - это тот же фрагмент, что и p, но длины полученного сообщения.

Имея этот API, пользователь может решить, следует ли повторно использовать одну и ту же память для каждого входящего сообщения или выделить ее.

Встроенные метрики Prometheus

Эта часть может показаться излишней для библиотеки, называя себя минималистичной.
nanoQ может потерять данные, и мы хотим, чтобы пользователь знал, так ли это. Обычный способ общения программы с программистом - ведение журнала не работает. Если вы теряете тысячи сообщений в секунду, создавать тысячи строк журнала - не лучшая идея. И именно по этой причине были изобретены метрики.

Мы надеемся, что nanoQ решит и некоторые из ваших проблем. Раздвиги нас на GitHub.