Я использую связыватель Spring Cloud Stream Kafka для получения сообщений от Kafka. Я могу сделать свой образец работы с одним Kafka Binder, как показано ниже.
spring:
cloud:
stream:
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <broker url>
bindings:
consumer:
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
destination: some-other-topic
producer:
valueSerde: JsonSerde
Обратите внимание, что здесь обе привязки относятся к одному и тому же брокеру Kafka. Однако у меня есть ситуация, когда мне нужно опубликовать тему в каком-то кластере Kafka, а также использовать данные из другой темы в другом кластере Kafka. Как мне изменить мою конфигурацию, чтобы иметь возможность связываться с разными кластерами Kafka?
Я пробовал что-то вроде этого
spring:
cloud:
stream:
binders:
defaultbinder:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
bindings:
consumer:
binder: kafka1
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
binder: defaultbinder
destination: some-topic
producer:
valueSerde: JsonSerde
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <cluster1-brokers>
а также
spring:
cloud:
stream:
binders:
defaultbinder:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
kafka:
bindings:
consumer:
binder: kafka1
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
binder: defaultbinder
destination: some-topic
producer:
valueSerde: JsonSerde
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <cluster1-brokers>
Но оба они, похоже, не работают. Первая конфигурация кажется недействительной. Для второй конфигурации я получаю следующую ошибку
Caused by: java.lang.IllegalStateException: A default binder has been requested, but there is more than one binder available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : kafka1,defaultbinder, and no default binder has been set.
Я использую зависимость org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.1.RELEASE и Spring Boot 2.2.6
Пожалуйста, дайте мне знать, как настроить несколько привязок для Kafka с помощью Spring Cloud Stream
Обновить
Пробовал эту конфигурацию ниже
spring:
cloud:
stream:
binders:
kafka2:
type: kafka
environment:
spring.cloud.stream.kafka.binder.brokers: <cluster2-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.binder.brokers: <cluster1-brokers>
bindings:
consumer:
destination: <some-topic>
binder: kafka1
group: testconsumergroup
content-type: application/json
nativeEncoding: true
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
destination: some-topic
binder: kafka2
contentType: application/json
nativeEncoding: true
producer:
valueSerde: JsonSerde
Потоки сообщений и привязка EventHubBinding выглядят следующим образом
public interface MessageStreams {
String PRODUCER = "producer";
String CONSUMER = "consumer;
@Output(PRODUCER)
MessageChannel producerChannel();
@Input(CONSUMER)
SubscribableChannel consumerChannel()
}
@EnableBinding(MessageStreams.class)
public class EventHubStreamsConfiguration {
}
Мой класс продюсера выглядит так, как показано ниже
@Component
@Slf4j
public class EventPublisher {
private final MessageStreams messageStreams;
public EventPublisher(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
public boolean publish(CustomMessage event) {
MessageChannel messageChannel = getChannel();
MessageBuilder messageBuilder = MessageBuilder.withPayload(event);
boolean messageSent = messageChannel.send(messageBuilder.build());
return messageSent;
}
protected MessageChannel getChannel() {
return messageStreams.producerChannel();
}
}
А потребительский класс выглядит так, как показано ниже
@Component
@Slf4j
public class EventHandler {
private final MessageStreams messageStreams;
public EventHandler(MessageStreams messageStreams) {
this.messageStreams = messageStreams;
}
@StreamListener(MessageStreams.CONSUMER)
public void handleEvent(Message<CustomMessage> message) throws Exception
{
// process the event
}
@Override
@ServiceActivator(inputChannel = "some-topic.testconsumergroup.errors")
protected void handleError(ErrorMessage errorMessage) throws Exception {
// handle error;
}
}
Я получаю указанную ниже ошибку при попытке опубликовать и использовать сообщения из моего теста.
Dispatcher has no subscribers for channel 'application.producer'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[104], headers={contentType=application/json, timestamp=1593517340422}]
Я что-нибудь упускаю? Для одного кластера я могу публиковать и потреблять сообщения. Проблема возникает только с несколькими привязками кластера