Множественная привязка Spring Cloud Stream Kafka

Я использую связыватель Spring Cloud Stream Kafka для получения сообщений от Kafka. Я могу сделать свой образец работы с одним Kafka Binder, как показано ниже.

spring:
  cloud:
    stream:
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <broker url>
      bindings:
        consumer:
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-other-topic
          producer:
            valueSerde: JsonSerde

Обратите внимание, что здесь обе привязки относятся к одному и тому же брокеру Kafka. Однако у меня есть ситуация, когда мне нужно опубликовать тему в каком-то кластере Kafka, а также использовать данные из другой темы в другом кластере Kafka. Как мне изменить мою конфигурацию, чтобы иметь возможность связываться с разными кластерами Kafka?

Я пробовал что-то вроде этого

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      bindings:
        consumer:
          binder: kafka1
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          binder: defaultbinder
          destination: some-topic
          producer:
            valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>

а также

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      kafka:
        bindings:
          consumer:
            binder: kafka1
            destination: some-topic
            group: testconsumergroup
            consumer:
              concurrency: 1
              valueSerde: JsonSerde
          producer:
            binder: defaultbinder
            destination: some-topic
            producer:
              valueSerde: JsonSerde      
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <cluster1-brokers>

Но оба они, похоже, не работают. Первая конфигурация кажется недействительной. Для второй конфигурации я получаю следующую ошибку

Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka1,defaultbinder, and no default binder has been set.

Я использую зависимость org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE и Spring Boot 2.2.6

Пожалуйста, дайте мне знать, как настроить несколько привязок для Kafka с помощью Spring Cloud Stream

Обновить

Пробовал эту конфигурацию ниже

spring:
  cloud:
    stream:
      binders:
        kafka2:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster2-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder.brokers: <cluster1-brokers>
      bindings:
        consumer:
          destination: <some-topic>
          binder: kafka1
          group: testconsumergroup
          content-type: application/json
          nativeEncoding: true
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-topic
          binder: kafka2
          contentType: application/json
          nativeEncoding: true
          producer:
            valueSerde: JsonSerde

Потоки сообщений и привязка EventHubBinding выглядят следующим образом

public interface MessageStreams {
  String PRODUCER = "producer";
  String CONSUMER = "consumer;

  @Output(PRODUCER)
  MessageChannel producerChannel();

  @Input(CONSUMER)
  SubscribableChannel consumerChannel()
}

@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}

Мой класс продюсера выглядит так, как показано ниже

@Component
@Slf4j
public class EventPublisher {
  private final MessageStreams messageStreams;

  public EventPublisher(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  public boolean publish(CustomMessage event) {
    MessageChannel messageChannel = getChannel();
    MessageBuilder messageBuilder = MessageBuilder.withPayload(event);
    boolean messageSent = messageChannel.send(messageBuilder.build());
    return messageSent;
  }

  protected MessageChannel getChannel() {
    return messageStreams.producerChannel();
  }
}

А потребительский класс выглядит так, как показано ниже

@Component
@Slf4j
public class EventHandler {
  private final MessageStreams messageStreams;

  public EventHandler(MessageStreams messageStreams) {
    this.messageStreams = messageStreams;
  }

  @StreamListener(MessageStreams.CONSUMER)
  public void handleEvent(Message<CustomMessage> message) throws Exception 
  {
    // process the event
  }

  @Override
  @ServiceActivator(inputChannel = "some-topic.testconsumergroup.errors")
  protected void handleError(ErrorMessage errorMessage) throws Exception {
    // handle error;
  }
}

Я получаю указанную ниже ошибку при попытке опубликовать и использовать сообщения из моего теста.

Dispatcher has no subscribers for channel 'application.producer'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[104], headers={contentType=application/json, timestamp=1593517340422}]

Я что-нибудь упускаю? Для одного кластера я могу публиковать и потреблять сообщения. Проблема возникает только с несколькими привязками кластера


comment
Пробовали этот вариант? stackoverflow.com/a/60218943/1927543   -  person Karthikeyan    schedule 28.06.2020
comment
Вы смотрели этот образец? github.com/spring-cloud/spring-cloud-stream-samples/tree/master/   -  person sobychacko    schedule 29.06.2020