Сообщения Kafka обрабатываются повторно

У нас есть микросервисы, которые создают и принимают сообщения от Kafka с помощью spring-boot и spring-cloud-stream. Версии
:
spring -boot: 1.5.8.RELEASE
Spring-cloud-stream: Ditmars.RELEASE
Сервер Kafka: kafka_2.11-1.0.0

РЕДАКТИРОВАТЬ: Мы работаем в среде Kubernetes, используя кластер StatefulSets из 3 узлов Kafka и кластер из 3 узлов Zookeeper.

Мы столкнулись с несколькими случаями появления старых сообщений, которые повторно обрабатывались, когда эти сообщения уже были обработаны несколько дней назад.
Несколько примечаний:

  1. До этого были напечатаны следующие журналы (есть и другие похожие строки, это просто сводка)

Отмена ранее назначенных разделов [] для службы регистрации в группе
Обнаружен координатор dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 стойка: null)
Служба регистрации в группе успешно присоединена с генерацией 320

  1. Вышеупомянутые случаи отмены и переназначения разделов происходят каждые несколько часов. И лишь в некоторых случаях старые сообщения используются повторно. В большинстве случаев переназначение не вызывает потребления сообщения.
  2. Сообщения из разных разделов.
  3. На каждый раздел обрабатывается более одного сообщения.

application.yml:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-enrollment-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            enrollment-mail-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
            enroll-users-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
        bindings:
          user-enrollment-input:
            destination: enroll-users
            consumer:
              concurrency: 10
              partitioned: true
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true
          enrollment-mail-output:
            destination: send-enrollment-mail
            producer:
              partitionCount: 10
          enroll-users-output:
            destination: enroll-users
            producer:
              partitionCount: 10

Есть ли какая-то конфигурация, которую мне может не хватать? Что может вызвать такое поведение?


person Yossi Shasha    schedule 05.03.2018    source источник


Ответы (1)


Таким образом, настоящая проблема описана в следующем билете: https://issues.apache.org/jira/browse/KAFKA-3806. Использование предложенного обходного пути исправило это.

person Yossi Shasha    schedule 22.04.2018