KSQL создает таблицу из потока для последних данных

У меня есть тема "Клиенты", и я создал для нее ленту.

CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');

Мой продюсер для темы customers генерирует целочисленный ключ и значение json. Но когда я вижу, что ключ строки устанавливается на какое-то двоичное значение

ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}

Теперь, если я создам таблицу, в результате будет одна строка (возможно, потому что ключ строки такой же ??)

CREATE TABLE customers (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');

После поиска в Интернете я наткнулся на эту статью https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key и создал новый поток путем перераспределения по ключу

CREATE STREAM customers_stream2 AS \
 SELECT * FROM customers_stream \
 PARTITION BY customerId;

Итак, как мне создать таблицу с последними значениями данных о клиентах?

создание таблицы из потока приводит к ошибке

CREATE TABLE customers_2_table_active AS
  SELECT CUSTOMERID,ISACTIVE
  FROM customers_stream2;

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

Мне нужно последнее значение различных строк, чтобы другой микросервис мог запросить новую таблицу.

заранее спасибо


person Abhishek    schedule 05.10.2019    source источник


Ответы (1)


Смена ключей кажется правильным подходом, однако вы не можете напрямую преобразовать STREAM в TABLE.

Обратите внимание, что ваш поток с измененным ключом customers_stream2 записан в соответствующую тему. Следовательно, вы сможете создать новый TABLE из темы потока, чтобы получить последнее значение для каждого ключа.

person Matthias J. Sax    schedule 07.10.2019
comment
в качестве примечания, знаете ли вы, почему сгенерированный rowkey имеет такое значение? это ошибка в кафке? - person Abhishek; 07.10.2019
comment
Что вы подразумеваете под такой ценностью? - person Matthias J. Sax; 08.10.2019
comment
РОУКИ: \ u0000 \ u0000 \ u0003�. Разве ключ сообщения не хранится как rowkey? Если нет, то для чего нужен ключ строки? - person Abhishek; 08.10.2019
comment
Атм, ROWKEY в KSQL всегда String типа - может, вы просто неправильно десериализуете его? - person Matthias J. Sax; 09.10.2019