Как обновить разделы темы внутреннего журнала изменений при обновлении счетчика разделов исходной темы?

У меня есть приложение, в котором я использую соединение Kstream-Kstream и соединение Ktream-Ktable. Я обновил количество разделов темы источника ввода с 4 до 16, и приложение остановилось с ошибкой ниже.

Could not create internal topics: Existing internal topic application-test-processor-KSTREAM-JOINTHIS-0000000009-store-changelog has invalid partitions. Expected: 16 Actual: 4. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. Retry #3

Как обновить счетчик разделов темы внутреннего журнала изменений при обновлении счетчика разделов исходной темы?

Примечание: мы используем версию kafka: 0.10.2.1

Я посмотрел на инструмент сброса приложений по этой ссылке: https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html, но не сообщает, как обновить раздел журнала изменений.

Заранее спасибо.


person Nar123    schedule 18.04.2018    source источник


Ответы (1)


На самом деле рекомендуется использовать инструмент сброса.

Состояние вашего приложения сегментировано в зависимости от количества входных разделов. Изначально это было 4. Таким образом, изменение на 16 сломало приложение. Если вы вручную добавите разделы в тему журнала изменений (что было бы возможно и разрешило бы исключение, но не решило бы проблему на самом деле), состояние не будет перераспределено и, следовательно, будет повреждено.

Если вы используете инструмент сброса, вы удаляете все состояние и позволяете вашему приложению повторно обрабатывать все входные данные с нуля. Это позволяет Kafka Streams правильно воссоздавать состояние (теперь с 16 осколками).

person Matthias J. Sax    schedule 18.04.2018
comment
Спасибо, Мэтт. Я не хочу повторно обрабатывать сообщения. Можно ли удалить только внутренние темы с помощью инструмента сброса приложения, а затем перезапустить приложение, чтобы внутренние темы были воссозданы? - person Nar123; 23.04.2018
comment
Это возможно - вам также необходимо удалить локальное состояние через KafkaStreams#cleanup() или вручную - иначе вы не начнете с пустого состояния. - person Matthias J. Sax; 24.04.2018
comment
Привет, @ MatthiasJ.Sax, если ваш журнал изменений имеет более строгую политику хранения, чем исходная тема, как вы должны восстановить свое состояние? Вам придется переиграть события по исходной теме, не так ли? - person Bruno Bieth; 18.01.2019
comment
Темы журнала изменений не имеют срока хранения, но используют сжатие журнала. - И да, вы можете повторно обрабатывать только то, что все еще доступно в ваших входных темах. Это означает, что если ваше состояние содержит данные, которые нельзя пересчитать из-за отсутствия входных данных, вы потеряете часть своего состояния. - person Matthias J. Sax; 18.01.2019