Kafka Streams - агрегирование и объединение пользователей с адресами

У меня две уплотненные темы. Один содержит всю информацию о моем пользователе (USERID), а другой сохраняет их адреса (USERID, ADRESSID). В скобках указаны ключи. Я хочу сохранить данные пользователей только в одной теме с их списком адресов. Мой подход таков:

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key 
At the end, I am doing a leftJoin on user and aggregated_addresses over USERID and saving the result to a compacted topic called "user_addresses".

Я хочу добиться сохранения всех данных с их адресами в user_addresses. Это означает, что я не хочу терять адреса через какое-то время. Только если в базе данных был удален адрес. Мой вопрос в том, подходит ли мой подход для достижения этой цели. Мой прототип работает, и он сохраняет список адресов для каждого пользователя, но я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через какое-то время или нет.

Может быть, кто-нибудь подробно объяснит мне, как работает этот конвейер. Если новый поток (адрес) поступает, он проходит через весь конвейер (selectKey, groupByKey, aggregate) и попадает в тему aggregated_addresses, где сохраняется как список адресов? Агрегат шага использует этот оператор:

(user, address, queue) -> {...}

Используют ли потоки Kafka агрегированные_адреса для заполнения очереди указанного выше оператора? Я, если появится новый поток .aggregate, будет ли Kafka искать их соответствующие агрегированные списки в aggregated_addresses и заполнять очередь этими данными? Или он использует сгруппированные потоки .groupByKey, и каждый раз, когда приходит новый поток, весь сгруппированный поток отправляется для агрегирования? Если второе верно, удалит ли KGroupedStream некоторые потоки, например, через неделю? Если да, то какие-то адреса будут отсутствовать в очереди?

В чем внутренняя разница между KGroupedStream и KGroupedTable?

Интересно, что результат после соединения (в сжатой теме с именем user_addresses) содержит больше записей, чем таблица записей пользователя. Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом несколько вхождений (несколько смещений). При наименьшем смещении у этого пользователя нет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а при наибольшем смещении он имеет два адреса в своем списке. Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую уплотненную тему. Работает ли уплотнение Кафки как сборщик мусора, который впоследствии удаляет данные? Что, если я ищу ключ, получу ли я ключ с наибольшим смещением?

Прошу прощения за так много вопросов, но поскольку я все больше и больше работаю с потоками, некоторые вещи мне непонятны.

Заранее спасибо за помощь! :)


person Tomas Musil    schedule 03.12.2018    source источник


Ответы (1)


Я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через некоторое время или нет.

Он ничего не удалит.

Если я понимаю остальную часть вашего вопроса, вы спрашиваете, как работает оператор aggregate(). Он использует локальное хранилище состояний (реализованное с помощью RocksDB) для хранения <userId, X>, где X - это то, что возвращает ваш агрегированный UDF ((user, address, queue) -> { }), т. Е. Он должен быть X == queue). Таким образом, каждая входная запись выполняет локальный поиск в RocksDB для получения текущего queue, обновляет его, записывает обратно в RocksDB и отправляет его вниз по потоку в ваш оператор to(), который записывает его также в тему результатов.

Также прочтите документацию для получения дополнительных сведений: https://kafka.apache.org/21/documentation/streams/ Также есть много других материалов о Kafka Streams и о том, как они работают в Интернете (сообщения в блогах, записи выступлений, слайды ...)

Интересно, что результат после соединения (в сжатой теме с именем user_addresses) содержит больше записей, чем таблица записей пользователя. Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом есть несколько вхождений (несколько смещений). При наименьшем смещении у этого пользователя нет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а при наибольшем смещении он имеет два адреса в своем списке. Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую уплотненную тему. Работает ли уплотнение Кафки как сборщик мусора, который впоследствии удаляет данные? Что делать, если я ищу ключ, получу ли я ключ с наибольшим смещением?

Сжатие выполняется асинхронно в фоновом режиме, но не сразу. Также обратите внимание, что тематические (или, точнее,) разделы разбиты на «сегменты», а активный сегмент никогда не сжимается (размер сегмента по умолчанию - 1 ГБ). Вы можете настроить размер сегмента и то, как часто запускается сжатие (дополнительные сведения см. В документации: https://kafka.apache.org/documentation/#compaction).

Что делать, если я ищу ключ, получу ли я ключ с наибольшим смещением?

Не уверен, что вы имеете в виду. Kafka допускает только последовательное чтение, но не поиск ключей. Таким образом, вам нужно будет прочитать тему от начала до конца, чтобы найти последнюю версию ключа. Если вы обратитесь к функции «Интерактивные запросы» Kafka Streams, она будет запрашивать локальную базу данных RocksDB и, таким образом, содержать последнюю запись для каждого ключа.

Мой вопрос в том, подходит ли мой подход для достижения этой цели.

Да, с одной важной деталью, связанной с

В чем внутренняя разница между KGroupedStream и KGroupedTable?

Поскольку вы вводите тему - это сжатая тема, в которой используются ключи (userId,addressId), вы должны читать ее как table() (а не stream()):

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
    builder.table("address-topic")
      .selectKey(...) //Selecting USERID as key - this generates KStream
      .groupBy(...) //Select USERID as and group by USERID
      .aggregate(...) //Aggregating by USERID as key - this generates KTable
      .to("aggregated_addresses"); //KTable with USERID as key 

Разница в том, что если вы читаете тему KStreams, это интерпретируется как «факты», и поэтому семантика удаления отсутствует. Однако вводимая вами тема содержит записи «обновлений» и, следовательно, должна быть потребителем как таковая. KGroupedStream и KGroupedTable - это просто промежуточные объекты в API, которые также подразумевают семантику «факт» и «обновление». Опять же, проверьте документы и другие материалы в Интернете для получения более подробной информации.

person Matthias J. Sax    schedule 04.12.2018
comment
Большое спасибо за отличный и подробный ответ! Я до сих пор не понимаю, почему в моей теме так много записей с повторяющимися ключами. Вчера я импортировал свои данные, присоединился к пользователю с агрегированными адресами, а сегодня у меня по-прежнему много дубликатов. Когда я ищу ключ с помощью Lenses (графический интерфейс Landoop), я часто получаю в результате несколько записей (например, историю). Первая запись не имеет адреса, вторая запись имеет один адрес, а третья запись имеет два адреса. У каждой записи один и тот же ключ. Segment.size = 200000, тема уплотнена. Как вы думаете, что может быть причиной этого, что нужно изменить? - person Tomas Musil; 04.12.2018
comment
Все ли эти записи находятся в активном сегменте? - person Matthias J. Sax; 04.12.2018
comment
Как узнать, какой сегмент активен? - person Tomas Musil; 07.12.2018
comment
Активен только самый последний / самый новый сегмент в разделе. - person Matthias J. Sax; 08.12.2018