Как обрабатывать несколько записей в Kafka Consumer?

Привет, я работаю над Confluent Kafka Consumer. У меня есть несколько записей в моем брокере. Я хочу обработать все записи сейчас. Ниже моя реализация Consumer.

public ConsumeResult<string, GenericRecord> Consume(string topic)
    {
      ConsumeResult<string, GenericRecord> result;
      try
      {
        result = consumer.Consume();
        Commit(result);
        return result;
      }
      catch (Exception e)
      {
        this.logger.Error("KafkaClient", $"Error sending message '{e.Message}'");
        return null;
      }
    }

Результат возвращен от потребителя Если внутри брокера есть несколько записей, то одно событие/сообщение я получу за время, используя GenericRecord. Если записей несколько, то как эффективно обращаться с потребителем? Любая помощь будет оценена по достоинству. Спасибо


person Niranjan    schedule 16.07.2019    source источник


Ответы (1)


Вы бы просто зациклились. См. примеры

https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AvroGeneric/Program.cs

while (true)
{
    try
    {
        var consumeResult = consumer.Consume(cts.Token);

        Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Consume error: {e.Error.Reason}");
    }
}
person OneCricketeer    schedule 16.07.2019
comment
Спасибо за Ваш ответ. Если есть несколько записей, тогда, если я поставлю цикл while, тогда он выберет несколько записей, верно? - person Niranjan; 16.07.2019
comment
Это будет продолжать потреблять, да. Будете ли вы получать больше записей, зависит от продюсера - person OneCricketeer; 16.07.2019
comment
Итак, при выполнении var ConsumerResult = Consumer.Consume(cts.Token); он будет потреблять только одну запись, верно? и снова в следующем цикле он потребляет вторую запись? Я прав? - person Niranjan; 16.07.2019
comment
Спасибо. Это моя потребительская конфигурация. BootstrapServers = kafkaConfig.BootstrapServers, GroupId = ProductEvent Нужно ли добавлять какие-либо другие настройки? Я хочу прочитать данные из последнего чтения. - person Niranjan; 16.07.2019
comment
Я только что отладил свой код после добавления цикла while. У меня есть несколько записей в моем брокере. но вернул только одну запись. Я вижу смещение 5 (5 раз я добавлял данные). Я добавил снимок экрана. - person Niranjan; 16.07.2019