Kafka Recover from Commit Failed Exception

У меня проблема, когда моя фиксация не выполняется, потому что poll() слишком длинный (почему это происходит, я не знаю, сообщений не было, и он просто читался/фиксировался в пустой очереди, а мой интервал опроса установлен на часы ). Затем, когда он снова нажимает read(), он по какой-то причине не перебалансируется. Однако это происходит только тогда, когда мой код работает на bluemix, локально, когда я воспроизвожу исключение, следующее чтение() вызывает перебалансировку.

Каков правильный способ восстановления после CommitFailedException? Должен ли я закрыть () и воссоздать моего потребителя? Или вызов read() должен перебалансировать и позволить мне продолжить?


person kyl    schedule 20.07.2017    source источник
comment
Какую версию вы используете, так как поведение сердцебиения было значительно изменено в 0.10.x и выше   -  person Hans Jespersen    schedule 29.07.2017
comment
Мы используем 0.10.x   -  person kyl    schedule 01.08.2017
comment
0.10.0 или 0.10.1 или 0.10.2? Если 0.10.1 или выше, на что настроен ваш max.poll.interval.ms? Я спрашиваю из-за KIP-62 cwiki.apache.org/confluence/display/KAFKA/   -  person Hans Jespersen    schedule 01.08.2017
comment
Я специально использую компиляцию org.apache.kafka:kafka-clients:0.10.2.1.   -  person kyl    schedule 01.08.2017
comment
Хорошо, в этой версии у вас есть и max.poll.interval.ms, и session.timeout.ms для настройки. Значения по умолчанию: session.timeout.ms: = 10secs, max.poll.interval.ms = 5min, а также max.poll.records= 500 сообщений, поэтому вам нужно вызывать poll() не реже одного раза каждые 5 минут и вы получите до 500 сообщений для обработки в течение следующих 5 минут, прежде чем вам снова придется опросить (), иначе ваш потребитель будет исключен из группы потребителей.   -  person Hans Jespersen    schedule 01.08.2017
comment
как долго время ожидания в вашем вызове poll()? The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.   -  person Hans Jespersen    schedule 01.08.2017
comment
Вам не нужно вызывать commit, чтобы оставаться в группе. Вам нужно только вызвать опрос(). Даже если он вернет 0 записей, он все равно сохранит вас в группе потребителей.   -  person Hans Jespersen    schedule 01.08.2017
comment
Я вызываю фиксацию, чтобы мое смещение не удалялось брокером, если у меня нет сообщений через 24 часа (я не могу изменить параметр 24 часа по умолчанию, установленный службой концентратора сообщений). Я вызываю опрос каждые 5 секунд, если нет сообщений, и если есть сообщение для обработки, я даже добавил поток сердцебиения в пустой опрос () на всякий случай. Поэтому независимо от того, почему меня выгнали, я хочу иметь план повторного подключения, но не знаю, как правильно восстановить соединение с потребителем, с помощью close() и воссоздания, или если просто вызвать poll() достаточно   -  person kyl    schedule 03.08.2017
comment
Итак, на самом деле есть два вопроса, на которые нужно ответить. 1) если вы получаете исключение с ошибкой фиксации, каков правильный способ восстановления 2) почему (или) вы выбываете из группы в первую очередь. На оба ответа важно ответить, потому что, если вы так часто вызываете опрос и параметры установлены правильно, ваш потребитель никогда не должен быть выгнан. Если вы ответите только на вопрос № 1, вы будете неоднократно присоединяться к группе и выходить из нее и снова и снова вызывать перебалансировку, что является плохой практикой для Кафки. Однако я опубликую ответ на № 1 в качестве первого шага.   -  person Hans Jespersen    schedule 03.08.2017
comment
Да, вы совершенно правы. Я хочу знать о 1), чтобы я мог восстановить свое производственное развертывание, пока я пытаюсь выяснить, почему 2) происходит. Спасибо за вашу помощь до сих пор :)   -  person kyl    schedule 08.08.2017


Ответы (2)


Метод commitSync будет автоматически повторяться бесконечно, поэтому, если вы получаете CommitFailedException, то это не повторяемое условие, и повторный вызов фиксации вряд ли поможет. Вы получаете это исключение, потому что ваш потребитель был исключен из группы потребителей.

Если вы использовали commitAsync для фиксации смещений, повторные попытки не выполняются автоматически, и вы можете получить RetriableCommitFailedException, чтобы указать на потенциально временную ошибку, для которой вы можете вручную повторить попытку фиксации еще раз. Похоже, это не то, что происходит в вашем случае, но я включаю это для полноты этого ответа.

Как только ваш потребитель будет исключен из группы и вы получите это исключение CommitFailedException, вы можете просто продолжать вызывать poll() до тех пор, пока не будет завершена перебалансировка и вы не будете допущены обратно в группу потребителей (возможно, с новым набором разделов, чем раньше), и это будет продолжать.

Если ваше приложение не терпимо к условию, в котором разделы (и, следовательно, ключи), которые вы получаете, изменяются в середине потока, вам следует реализовать прослушиватель перебалансировки, который будет вызываться при изменении назначения раздела. См. http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

Если вы просто пытаетесь обойти тот факт, что срок действия смещений истекает каждые 24 часа, вам нужно будет вызывать фиксацию по крайней мере один раз в день, чтобы поддерживать смещение в актуальном состоянии в дополнение к периодическому вызову poll(), чтобы оставаться в группе потребителей.

person Hans Jespersen    schedule 04.08.2017
comment
Спасибо. Я вызываю poll() после того, как меня выгнали, однако это всего лишь часть обычной логики, и после этого вызывается commit(). Локально это работает нормально, при развертывании кажется, что иногда poll() не запускает перебалансировку, и я продолжаю получать сообщение об ошибке, так как снова вызываю commit(). Вы знаете, должно ли быть достаточно одного опроса () или я должен держать его в каком-то цикле, а затем обнаруживать перебалансировку? - person kyl; 08.08.2017
comment
Более вероятно, что перебалансировка в Bluemix занимает больше времени, чем локально. Будет ребаланс, когда ваш потребитель будет выгнан, и еще один, когда потребитель вернется. Возможно, вам придется подождать, пока не завершится первая ребалансировка выхода, прежде чем вы сможете запустить вторую рекомбинацию. Я знаю, что в 0.11 есть некоторые улучшения, чтобы добавить задержку, чтобы позволить нескольким потребителям присоединиться одновременно, вместо того, чтобы делать кучу объединений и перебалансировок последовательно. - person Hans Jespersen; 08.08.2017
comment
Заглянул в это больше. Вам не нужно ждать завершения ребалансировки. Просто вызовите poll(), и координатор группы вызовет правильную перебалансировку, необходимую для возвращения потребителя в группу. - person Hans Jespersen; 08.08.2017
comment
Хорошо, спасибо. Кажется, что мой конкретный случай очень странный, так как я вызываю poll() после того, как меня кикают, но, похоже, это не вызывает перебалансировку, и он продолжает и commit() (что, очевидно, терпит неудачу). Это может быть какая-то странная проблема с bluemix, поскольку раньше у них были проблемы с сетью. У меня также есть rebalancelistener, где я читаю свои смещения из другого хранилища в onPartitionsAssigned и ничего не делаю в onPartitionsRevoked — может ли это быть проблемой? Когда я читаю документы, кажется, что onPartitionsRevoked используется для сохранения смещений, а не для остановки потребителя. - person kyl; 09.08.2017
comment
Похоже, вы все делаете правильно. Возможно, из-за проблем с сетью вашего потребителя выгоняют, потому что ему не хватает 10 секунд сердцебиения. Попробуйте увеличить этот интервал и посмотрите, поможет ли это. - person Hans Jespersen; 09.08.2017

@kyl, поэтому я считаю, что с клиентом kafka-java по умолчанию потребитель будет биться каждые 3 секунды, а время ожидания сеанса составляет 10 секунд, поэтому ваш потребитель должен оставаться в группе, не выходя из группы и не выполняя перебалансировку. Какое сообщение было включено в ваш CommitFailedException? Я предполагаю, что фиксация не удалась, потому что вас выгнали.

еще несколько вопросов по этому поводу:

  1. у вас есть несколько потребителей, приходящих и уходящих, и/или вы намеренно хотите использовать группы потребителей, а не только одного потребителя?

  2. что вы подразумеваете под «мой интервал опроса установлен в часах»?

  3. что вы подразумеваете под "фиксацией пустой очереди"?

Можете ли вы поделиться фрагментом кода вашего потребительского цикла, так как это может помочь лучше объяснить, что вы делаете?

person Dominic Evans    schedule 28.07.2017
comment
Код может быть немного сложным, но сначала позвольте мне ответить на ваши вопросы. Сообщение заключалось в том, что мой интервал опроса был слишком длинным. 1) Несколько потребителей, которые не спят, используются для одновременной обработки сообщений. 2) max.poll.interval.ms=1800000 3) Я вызываю commit(), даже если сообщение не прочитано, чтобы сохранить мое смещение в брокере. Независимо от того, почему я получил перебалансировку, мне все еще нужно обработать ошибку фиксации, и я хочу знать, как правильно начать повторное прослушивание брокера с потребителем, который получил эту ошибку. Спасибо - person kyl; 28.07.2017