kafka обновляется до .9 с новым потребительским API

Мы обновляем нашу реализацию kafka до версии .9 и используем новый потребительский java API для создания потребителя. Я использую приведенный ниже код для потребителя, и мы используем установку темы для потребителя, как в LINE A и LINE B, это вызов нашей службы, которая обрабатывает сообщения, которые мы получаем. Теперь проблема в том, что мы получаем исключение, если обработка нашего сообщения занимает более 30 секунд.

    Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("auto.offset.reset", "earliest");
            props.put("heartbeat.interval.ms", "1000");
            props.put("receive.buffer.bytes", 10485760);
            props.put("fetch.message.max.bytes", 5242880);
            props.put("enable.auto.commit", false);
    //with partition assigned to consumer


            KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
           // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
            //consumer.assign(Arrays.asList(partition0));
            //assign topic to consumer without partition
//LINE A
            consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                try {
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    consumeFromQueue(records);//LINE B
                    consumer.commitSync();

                } catch (CommitFailedException e) {
                    e.printStackTrace();
                    System.out.println("CommitFailedException");
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("Exception in while consuming messages");
                }

Исключение

03.03.2016 10:47:35.095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator : координатор 2147483647 помечен как мертвый. 2016-03-03 10:47:35.096 ОШИБКА 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.ConsumerCoordinator: произошла ошибка ILLEGAL_GENERATION при фиксации смещений для группы TEST-GROUP CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException : фиксация не может быть завершена из-за перебалансировки группы в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler .handle(ConsumerCoordinator.java:493) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler .onSuccess(AbstractCoordinator.java:644) на org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) на org.apache.kafka.cl ients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) на org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) на org.apache.kafka.clients.consumer. internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) в org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll( ConsumerNetworkClient.java:320) по адресу org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)

Вышеупомянутое исключение возникает при совершении смещения. Любые предложения помогут спасибо


person Sunny Gupta    schedule 09.03.2016    source источник


Ответы (2)


Это происходит потому, что новый потребитель является однопоточным, и единственный способ, с помощью которого он может поддерживать пульсацию с группой потребителей, — это опрос или фиксация смещения, через 30 секунд координатор группы помечает вашего потребителя как мертвого и вызывает группу. перебалансировать. В этой ситуации вы можете либо увеличить request.timeout.ms, либо разделить работу по потреблению и обработке между двумя потоками.

person Nautilus    schedule 09.03.2016
comment
Спасибо за ответ, я попытался добавить request.timeout.ms к 70000, это дало мне такое же исключение, хотя обработка сообщения заняла 30000. - person Sunny Gupta; 10.03.2016
comment
Не уверен, почему это не имеет никакого эффекта. Я даже пытался установить для session.timeout.ms значение 70000, но это дало мне исключение, говорящее org.apache.kafka.common.errors.ApiException: время ожидания сеанса не находится в допустимом диапазоне. Я думал о вашем втором предложении, но если что-то пойдет не так при обработке потоков, как я с этим справлюсь? добавить его снова в тему как новое сообщение и отменить изменения, вызванные сообщением до исключения? - person Sunny Gupta; 10.03.2016
comment
также увеличьте параметр group.max.session.timeout.ms - person Nautilus; 10.03.2016
comment
Пробовал ставить. Журналы без каких-либо эффектов выглядят следующим образом: - 2016-03-10 16:43:26.339 WARN 10376 --- [ask-scheduler-4] o.a.k.clients.consumer.ConsumerConfig : Конфигурация group.max.session.timeout.ms = 40000 было предоставлено, но не является известной конфигурацией. 2016-03-10 16:43:26.341 ИНФОРМАЦИЯ 10376 --- [ask-scheduler-3] o.a.kafka.common.utils.AppInfoParser: версия Kafka: 0.9.0.1 2016-03-10 16:43:26.342 ИНФОРМАЦИЯ 10376 - -- [ask-scheduler-3] o.a.kafka.common.utils.AppInfoParser: Kafka commitId: 23c69d62a0cabf06 - person Sunny Gupta; 10.03.2016
comment
Я передаю group.max.session.timeout.ms как свойство, как и другие свойства. - person Sunny Gupta; 10.03.2016
comment
вам нужно установить его в файле server.properties, взгляните на kafka.apache.org/documentation .html - person Nautilus; 11.03.2016
comment
@nautilus единственный способ, которым он может поддерживать пульсацию с группой потребителей, - это опрос или фиксация смещения. Вы ошибаетесь здесь. он может поддерживать пульсацию с группой потребителей только путем опроса. - person Black_Rider; 07.07.2016
comment
@Black_Rider У меня сложилось впечатление, что когда вы выполняете коммит под капотом, он также вызывает опрос, следовательно, поддерживает пульс. - person Nautilus; 07.07.2016

Вы можете ограничить количество сообщений, возвращаемых poll(), установив

max.partition.fetch.bytes

до некоторого подходящего порога, который больше вашего самого большого сообщения, но настолько низок, что вы получите меньше сообщений за опрос.

Kafka 0.10.x поддерживает явное ограничение количества сообщений, возвращаемых клиенту, путем установки

max.poll.records
person nilsmagnus    schedule 03.02.2017