Количество отправленных Kinesis записей не равно количеству использованных записей

Мы оцениваем Kinesis, и я обнаружил следующее поведение. У меня есть простой тест с использованием Kinesis для проверки точности и базовой функциональности.

Тест создает элемент в потоке следующим образом:

    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName( streamName );
    putRecordRequest.setData(ByteBuffer.wrap(event.getBytes()));
    putRecordRequest.setPartitionKey( message.getEventList().getEvents().get(0).getLicenseKey());

    UsageServiceStatistics.instance().getKinesisSent().increase();
    PutRecordResult putRecordResult = kinesisManager.getConnection().putRecord( putRecordRequest );

Затем я использую клиентскую библиотеку Amazon Kinesis (KCL) следующим образом:

@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer)
{
    logger.debug("Received a list of records for processing with size:" + records.size());

    for (Record record : records)
    {
        UsageServiceStatistics.instance().getKinesisConsumed().increase();
        logger.debug("Kinesis consumed:" + UsageServiceStatistics.instance().getKinesisConsumed());
        if (!processRecord(record))
        {
            logger.error("Couldn't process record " + record + ". Skipping the record.");
        }
    }

    checkpointManager.checkpoint(iRecordProcessorCheckpointer);
}

Я вижу расхождения между количеством произведенных и потребляемых записей. Например, при отправке серии из 2000 элементов 3 раза подряд я вижу следующее:

Kinesis sent:counter=2000
Kinesis consumed:1999

Kinesis sent:counter=4000
Kinesis consumed:counter=3994

Kinesis sent:counter=6000
Kinesis consumed:counter=5999

Почему я не вижу точно такое же количество произведенного и потребленного? Почему после второго запуска 6 элементов отсутствовали, и я получил 2006 потребленных записей только при запуске 3, хотя я ждал не менее 2 минут между запуском 2 и запуском 3.

Наконец, я сделал набор тестов перед этим с более высокой частотой контрольных точек, и тогда расхождения были еще больше? Какое правило использует Amazon KCL для запуска отправки записей потребителю? Почему он прекращает отправку и сохраняет элементы в очереди (например, со 2 по 3)? Где последний пункт из 6000 отправленных?

Спасибо заранее


person isaac.hazan    schedule 11.12.2014    source источник
comment
Вы проверили наличие исключений при записи события в Kinesis. Вы могли быть ограничены для некоторых вызовов put_record.   -  person Guy    schedule 11.12.2014
comment
Я проверял, и никаких исключений нет даже при установке уровня журнала кинезиса для отладки.   -  person isaac.hazan    schedule 11.12.2014
comment
Как вы поставили уровень журнала kinesis для отладки. Не могли бы вы опубликовать шаги/образец, который вы сделали?   -  person Sunil Gulabani    schedule 25.02.2016


Ответы (1)


Я нашел первопричину.

Это была ошибка в моем коде.

KCL создает количество обработчиков записей, равное количеству сегментов в конкретном потоке.

Однако я допустил ошибку, заставив их использовать один и тот же объект Checkpointer в многопоточной среде. Когда я исправил это, чтобы каждый процессор записи имел свой контрольный указатель, он работал отлично, и подсчеты были согласованными.

person isaac.hazan    schedule 11.12.2014