Потребитель не получает сообщения после перезапуска производителя / потребителя kafka

У нас есть один производитель, один потребитель и один раздел. Оба потребителя / производителя являются приложениями с весенней загрузкой. Клиентское приложение работает на моем локальном компьютере, а продюсер вместе с kafka и zookeeper - на удаленном компьютере.

Во время разработки я повторно развернул свое приложение производителя с некоторыми изменениями. Но после этого мой потребитель не получает никаких сообщений. Я попытался перезапустить потребителя, но безуспешно. В чем может быть проблема и / или как ее решить?

Конфигурация потребителя:

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

Конфигурация производителя:

cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

РЕДАКТИРОВАТЬ2:

Через 5 минут пользовательское приложение умирает за следующим исключением:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel

person Somesh Gupta    schedule 12.09.2017    source источник
comment
Похоже на довольно простой сценарий. Не могли бы вы поделиться этим приложением где-нибудь на GitHub, чтобы мы могли воспроизвести проблему локально?   -  person Artem Bilan    schedule 12.09.2017
comment
@ArtemBilan Мне очень жаль, но я не могу поделиться своим кодом. Какие детали вам нужны, чтобы предложить решение?   -  person Somesh Gupta    schedule 12.09.2017
comment
У меня нет идей без кода. Может можно поделиться конфигами потребителя и производителя? И да, я знаю, что вы не можете поделиться всем приложением, но, по крайней мере, можете придумать для нас какое-нибудь простое приложение Spring Boot ...   -  person Artem Bilan    schedule 12.09.2017
comment
@ArtemBilan добавил конфиги.   -  person Somesh Gupta    schedule 12.09.2017
comment
Хорошо. Спасибо! Так хорошо. Что такое версия Kafka Binder? Что вы имеете в виду под перераспределением? Как это сделать локально? Приложение SCSt - это микросервис. Я смущен.   -  person Artem Bilan    schedule 12.09.2017
comment
Включение ведения журнала DEBUG - это всегда хорошее место для начала.   -  person Gary Russell    schedule 12.09.2017


Ответы (2)


Посмотрите, содержит ли приведенное выше предложение об DEBUG дополнительную информацию. Похоже, вы получаете исключение тайм-аута от KafkaTopicProvisioner. Но это происходит, когда вы перезапускаете потребителя, я полагаю. Похоже, у потребителя есть проблемы с общением с брокером, и вам нужно выяснить, что там происходит.

person sobychacko    schedule 12.09.2017

Что ж, похоже, что уже есть ошибка сообщается с spring-cloud-stream-binder-kafka о том, что свойство resetOffset не имеет никакого эффекта. Следовательно, на потребителе всегда запрашивались сообщения со смещением latest.

Как упоминалось в отношении проблемы с git, единственный обходной путь - исправить это с помощью инструмента командной строки kafka для потребителей.

person Somesh Gupta    schedule 14.09.2017