Использование KCL 2.x для получения DynamoDBEvents из KinesisStream

Из KinesisClientRecord (записи), которую я получил с помощью клиента KCL, я могу получить объект JSON, представляющий событие обновления ddb из потока, выполнив следующие действия:

String recordData = StandardCharsets.UTF_8.decode(record.data()).toString();
JSONObject kinesisRecordObject = new JSONObject(recordData);

Теперь я хочу получить доступ к полям, которые я получил из DynamoDB, но я не могу десериализовать его, так как это DynamoDB JSON, а не стандартный, как описано в этом посте Преобразование DynamoDB JSON в стандартный JSON с помощью Java. Приведенное там решение работает с DynamoDBStreamRecord, но не с KinesisClientRecord (с которым я работаю), скажите, пожалуйста, как выполнить этот шаг? Я обнаружил, что для KCL 1.x можно использовать адаптер KCL, но я использую KCL 2.x!

Как я могу десериализовать его, чтобы получить поля в обновлении DDB?


person Akshit Arora    schedule 24.06.2021    source источник


Ответы (1)


При использовании KCL 2 вы получаете объект CommittableRecord от Kinesis. Из этого вам нужно извлечь записи().data(), которые закодированы в UTF 8 (по умолчанию). Строка JSON, которую вы получаете после декодирования, затем вы можете преобразовать в желаемый POJO.

Что-то вроде этого

String recordData =
        new StringBuffer(StandardCharsets.UTF_8.decode(committableRecordObject.record().data()))
        .toString();
person Tejas_Garde    schedule 03.08.2021