Я пытаюсь сделать так, чтобы x потребителей обращались к указанной теме в kafka, но не использовали одни и те же сообщения. Я хочу например ...
Смещение подбора потребителя 1 1 Смещение подбора потребителя 2 2 Смещение подбора потребителя 1 3 Смещение подбора потребителя 2 4
Я хочу, чтобы кафка действовал как очередь для этих двух потребителей. Я заметил конфигурацию group.id и предположил, что вы можете использовать ту же группу, и она будет обрабатывать ее соответствующим образом, но, похоже, она не работает так, как я думал.
Вот код, который я использую ...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
Я хочу иметь возможность запускать этот код в кластерной среде и иметь очередь kafka. Я хочу, чтобы он вытащил сообщение и одновременно перешел к следующему смещению, тогда следующий опрос kafka захватит следующее смещение. Это возможно? И если да, то что я делаю не так?