Я работаю над средой разработки с 3 (dockerized) брокерами kafka в моей системе. У брокеров значение transaction.state.log.replication.factor равно 3.
В конфигурации потокового приложения я установил processing.guarantee как EXACTLY_ONCE, а в конфигурации потребительского приложения я установил изоляцию .level как "read_committed".
Я проверил другие конфигурации на https://docs.confluent.io/current/streams/developer-guide/config-streams.html#processing-guarantee, и я настроил свою среду в соответствии с руководством.
После минуты создания сообщения из потокового приложения, которое читает хранилище состояний и создает 100 сообщений с помощью функции context.forward (..), приложение-потребитель прекращает чтение, как если бы в назначенных разделах не было зафиксированных сообщений.
Через некоторое время приложение стрима вылетает со следующей ошибкой:
"Прерывание пакетов производителей из-за фатальной ошибки org.apache.kafka.common.errors.ProducerFencedException: производитель попытался выполнить операцию со старой эпохой. Либо есть новый производитель с тем же транзакционным идентификатором, либо брокер истек срок действия транзакции производителя. . "
Похоже, что производитель потока не может получить подтверждение, и транзакция истекает.
Редактировать 1: когда я останавливаю потоковое приложение, потребитель получает зафиксированные сообщения.