У меня две уплотненные темы. Один содержит всю информацию о моем пользователе (USERID), а другой сохраняет их адреса (USERID, ADRESSID). В скобках указаны ключи. Я хочу сохранить данные пользователей только в одной теме с их списком адресов. Мой подход таков:
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
Я хочу добиться сохранения всех данных с их адресами в user_addresses. Это означает, что я не хочу терять адреса через какое-то время. Только если в базе данных был удален адрес. Мой вопрос в том, подходит ли мой подход для достижения этой цели. Мой прототип работает, и он сохраняет список адресов для каждого пользователя, но я спрашиваю себя, удалит ли KGroupedStream некоторые потоки через какое-то время или нет.
Может быть, кто-нибудь подробно объяснит мне, как работает этот конвейер. Если новый поток (адрес) поступает, он проходит через весь конвейер (selectKey, groupByKey, aggregate) и попадает в тему aggregated_addresses, где сохраняется как список адресов? Агрегат шага использует этот оператор:
(user, address, queue) -> {...}
Используют ли потоки Kafka агрегированные_адреса для заполнения очереди указанного выше оператора? Я, если появится новый поток .aggregate, будет ли Kafka искать их соответствующие агрегированные списки в aggregated_addresses и заполнять очередь этими данными? Или он использует сгруппированные потоки .groupByKey, и каждый раз, когда приходит новый поток, весь сгруппированный поток отправляется для агрегирования? Если второе верно, удалит ли KGroupedStream некоторые потоки, например, через неделю? Если да, то какие-то адреса будут отсутствовать в очереди?
В чем внутренняя разница между KGroupedStream и KGroupedTable?
Интересно, что результат после соединения (в сжатой теме с именем user_addresses) содержит больше записей, чем таблица записей пользователя. Я посмотрел глубже и увидел, что у пользователя с одним и тем же ключом несколько вхождений (несколько смещений). При наименьшем смещении у этого пользователя нет адресов, затем при более высоком смещении он имеет один адрес в своем списке, а при наибольшем смещении он имеет два адреса в своем списке. Я снова спрашиваю себя, почему старые смещения не удаляются автоматически, когда я использую уплотненную тему. Работает ли уплотнение Кафки как сборщик мусора, который впоследствии удаляет данные? Что, если я ищу ключ, получу ли я ключ с наибольшим смещением?
Прошу прощения за так много вопросов, но поскольку я все больше и больше работаю с потоками, некоторые вещи мне непонятны.
Заранее спасибо за помощь! :)