Метод commitSync
будет автоматически повторяться бесконечно, поэтому, если вы получаете CommitFailedException
, то это не повторяемое условие, и повторный вызов фиксации вряд ли поможет. Вы получаете это исключение, потому что ваш потребитель был исключен из группы потребителей.
Если вы использовали commitAsync
для фиксации смещений, повторные попытки не выполняются автоматически, и вы можете получить RetriableCommitFailedException
, чтобы указать на потенциально временную ошибку, для которой вы можете вручную повторить попытку фиксации еще раз. Похоже, это не то, что происходит в вашем случае, но я включаю это для полноты этого ответа.
Как только ваш потребитель будет исключен из группы и вы получите это исключение CommitFailedException
, вы можете просто продолжать вызывать poll() до тех пор, пока не будет завершена перебалансировка и вы не будете допущены обратно в группу потребителей (возможно, с новым набором разделов, чем раньше), и это будет продолжать.
Если ваше приложение не терпимо к условию, в котором разделы (и, следовательно, ключи), которые вы получаете, изменяются в середине потока, вам следует реализовать прослушиватель перебалансировки, который будет вызываться при изменении назначения раздела. См. http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
Если вы просто пытаетесь обойти тот факт, что срок действия смещений истекает каждые 24 часа, вам нужно будет вызывать фиксацию по крайней мере один раз в день, чтобы поддерживать смещение в актуальном состоянии в дополнение к периодическому вызову poll(), чтобы оставаться в группе потребителей.
person
Hans Jespersen
schedule
04.08.2017
max.poll.interval.ms
? Я спрашиваю из-за KIP-62 cwiki.apache.org/confluence/display/KAFKA/ - person Hans Jespersen   schedule 01.08.2017max.poll.interval.ms
, иsession.timeout.ms
для настройки. Значения по умолчанию: session.timeout.ms: = 10secs, max.poll.interval.ms = 5min, а также max.poll.records= 500 сообщений, поэтому вам нужно вызывать poll() не реже одного раза каждые 5 минут и вы получите до 500 сообщений для обработки в течение следующих 5 минут, прежде чем вам снова придется опросить (), иначе ваш потребитель будет исключен из группы потребителей. - person Hans Jespersen   schedule 01.08.2017The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
- person Hans Jespersen   schedule 01.08.2017