kafka-streams join создает дубликаты

У меня есть две темы:

// photos
{'id': 1, 'user_id': 1, 'url': 'url#1'},
{'id': 2, 'user_id': 2, 'url': 'url#2'},
{'id': 3, 'user_id': 2, 'url': 'url#3'}

// users
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'},
{'id': 1, 'name': 'user#1'}

Я создаю фото карты пользователем

KStream<Integer, Photo> photo_by_user = ...

photo_by_user.to("photo_by_user")

Затем я пытаюсь присоединиться к двум таблицам:

KTable<Integer, User> users_table = builder.table("users");
KTable<Integer, Photo> photo_by_user_table = builder.table("photo_by_user");
KStream<Integer, Result> results = users_table.join(photo_by_user_table, (a, b) -> Result.from(a, b)).toStream();

results.to("results");

результат как

{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}
{'photo_id': 1, 'user': 1, 'url': 'url#1', 'name': 'user#1'}
{'photo_id': 2, 'user': 2, 'url': 'url#2', 'name': 'user#2'}
{'photo_id': 3, 'user': 3, 'url': 'url#3', 'name': 'user#3'}

Я вижу, что результаты дублируются. Почему и как этого избежать?


person mike    schedule 10.07.2018    source источник


Ответы (1)


Вы можете столкнуться с известной ошибкой. При "чистом" объединении KTable-KTable могут возникать дубликаты. Обратите внимание, что эти дубликаты, строго говоря, не являются неверными, потому что результатом является поток обновлений, и обновление «A» до «A» не меняет результат. Конечно, нежелательно получать эти дубликаты. Попробуйте отключить кэширование — без кэширования «проблем со сбросом» возникнуть не должно.

person Matthias J. Sax    schedule 11.07.2018
comment
Маттиас: У меня похожая проблема, но с отключенным кэшированием. Я написал о проблеме здесь. Есть ли у вас понимание этого вопроса? Заранее спасибо. - person Freestyle076; 20.07.2018