Как получить ConsumerOffset (хранится в kafka) для группы потребителей в Java?

У меня есть несколько потребителей и производителей Kafka с разными темами. С независимым приложением я хочу отслеживать отставание потребителя кафки.

Я использую kafka 0.10.0.1, так как Kafka теперь хранит потребительские смещения в самой kafka, так как я могу прочитать то же самое.

Я могу прочитать смещение темы для каждого раздела.


person Nitesh    schedule 06.03.2017    source источник
comment
Вы можете использовать скрипт kafka-consumer-groups.sh, который может отображать информацию о группах потребителей.   -  person amethystic    schedule 06.03.2017
comment
Я использую Java, нет ли какой-нибудь вспомогательной библиотеки или способа ее получить?   -  person Nitesh    schedule 09.03.2017
comment
Попробуйте вызвать AdminClient.createSimplePlaintext("localhost:9092").listGroupOffsets   -  person amethystic    schedule 09.03.2017
comment
@amethystic listGroupOffsets() отсутствует в пакете. Можете ли вы поделиться ссылкой или полным примером кода?   -  person Nitesh    schedule 09.03.2017
comment
@Nitesh, у меня он присутствует в версии 0.10.2.1.   -  person Dani    schedule 17.05.2017


Ответы (1)


Вы можете написать такой код, чтобы получить смещения для группы:

public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {  
    Socket socket = connect(host, port);
    try {
        return send(request, apiKey, socket);
    } finally {
        socket.close();
    }
}

private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
}

private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
}

private byte[] getResponse(Socket socket) throws IOException {
     DataInputStream dis = null;
     try {
         dis = new DataInputStream(socket.getInputStream());
         byte[] response = new byte[dis.readInt()];
         dis.readFully(response);
         return response;
    } finally {
         if (dis != null) {
             dis.close();
         }
     }
}

private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
}

private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
}    

// Get offsets of a given topic for a group
public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp)).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
} 

// Get offsets of all topics for a group
public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
}
person amethystic    schedule 10.03.2017