Мы обновляем нашу реализацию kafka до версии .9 и используем новый потребительский java API для создания потребителя. Я использую приведенный ниже код для потребителя, и мы используем установку темы для потребителя, как в LINE A и LINE B, это вызов нашей службы, которая обрабатывает сообщения, которые мы получаем. Теперь проблема в том, что мы получаем исключение, если обработка нашего сообщения занимает более 30 секунд.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
Исключение
03.03.2016 10:47:35.095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator : координатор 2147483647 помечен как мертвый. 2016-03-03 10:47:35.096 ОШИБКА 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.ConsumerCoordinator: произошла ошибка ILLEGAL_GENERATION при фиксации смещений для группы TEST-GROUP CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException : фиксация не может быть завершена из-за перебалансировки группы в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler .handle(ConsumerCoordinator.java:493) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler .onSuccess(AbstractCoordinator.java:644) на org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) на org.apache.kafka.cl ients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) на org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) на org.apache.kafka.clients.consumer. internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) в org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll( ConsumerNetworkClient.java:320) по адресу org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
Вышеупомянутое исключение возникает при совершении смещения. Любые предложения помогут спасибо