Я использую kafka 0.8.2, я хочу использовать API смещения выборки и фиксации, как документ сказал:
Смещения для данной группы потребителей поддерживаются конкретным брокером, называемым координатором смещения. т. е. потребителю необходимо отправить запрос на фиксацию смещения и получить запросы к этому конкретному брокеру. Он может обнаружить текущий координатор смещения, отправив запрос метаданных потребителя.
Поэтому я отправляю ConsumerMetadataRequest
, вместо того, чтобы получать правильный ответ, я всегда получаю ConsumerCoordinatorNotAvailableCode
Посредник возвращает этот код ошибки для запросов метаданных потребителя или запросов фиксации смещения, если тема смещения еще не создана.
Если я использую версия kafka 0.8.2beta, в которой нет проблемы.
Также я использую клиент go sarama, я создал тему __consumer_offsets
перед получением метаданных. А вот и мой конфиг:
broker.id=1
port=9091
host.name=192.168.33.10
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=data/9091
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000