Kafka Streams TimestampExtractor

Привет всем, у меня есть вопрос о TimestampExtractor и Kafka Streams ....

В нашем приложении есть возможность получать неупорядоченные события, поэтому я предпочитаю упорядочивать события в зависимости от бизнес-даты внутри полезной нагрузки, а не по моменту времени, который они помещают в тему.

Для этой цели я запрограммировал пользовательский TimestampExtractor, чтобы иметь возможность извлекать временную метку из полезной нагрузки. Все, что я сказал здесь, работало отлично, но когда я построил KTable для этой темы, я обнаружил, что событие, которое я получаю не по порядку (с точки зрения бизнеса, это не последнее событие, но оно получено в конце), отображается как последнее состояние объекта, в то время как ConsumerRecord имеет метку времени из полезной нагрузки.

Я не знаю, может быть, я ошибался, полагая, что Kafka Stream исправит эту неисправную проблему с помощью TimestampExtractor.

Затем во время отладки я увидел, что если TimestampExtractor возвращает -1 в результате, Kafka Streams игнорирует сообщение, а TimestampExtractor также доставляет метку времени последнего принятого события, поэтому я создаю логику, которая реализует следующую проверку (payloadTimestamp ‹previousTimestamp) возвращает -1 , что соответствует моей логике, но я не уверен, плыву я по опасным водам или нет.

Могу ли я иметь дело с подобной логикой или какие другие способы существуют для работы с неупорядоченными событиями в потоках Kafka ....

Спасибо за ответы ..


person posthumecaver    schedule 05.10.2018    source источник


Ответы (1)


В настоящее время (Kafka 2.0) KTables не учитывают временные метки при обновлении, потому что предполагается, что во входной теме нет неупорядоченных данных. Причиной этого предположения является «принцип единого писателя» - предполагается, что для сжатой входной темы KTable существует только один производитель для каждого ключа, и, таким образом, не будет никаких неупорядоченных данных в отношении одиночные ключи.

Это известная проблема: https://issues.apache.org/jira/browse/KAFKA-6521

Для вашего исправления: это не на 100% правильно и не безопасно делать этот "взлом":

  • Во-первых, предположим, что у вас есть два разных сообщения с двумя разными ключами <key1, value1, 5>, <key2, value2, 3>. Вторая запись с меткой времени 3 находится позже, по сравнению с первой записью с меткой времени 5. Однако обе имеют разные ключи, и поэтому вы действительно хотите поместить вторую запись в KTable. Только если у вас есть две записи с одним и тем же ключом, вы хотите отбросить поздно поступающие данные IHMO.
  • Во-вторых, если у вас есть две записи с одним и тем же ключом, а вторая - вне очереди, и вы вылетаете перед обработкой второй, TimestampExtractor теряет метку времени первой записи. Таким образом, при перезапуске запись о нарушении порядка не удаляется.

Чтобы сделать это правильно, вам нужно будет отфильтровать «вручную» в логике вашего приложения вместо не зависящего от состояния и не зависящего от ключей TimestampExtractor. Вместо чтения данных через builder#table() вы можете прочитать их как поток и применить .groupByKey().reduce() для построения KTable. В вашей Reducer логике вы сравниваете метку времени новой и старой записи и возвращаете запись с большей меткой времени.

person Matthias J. Sax    schedule 05.10.2018