Как создать поля потока KSQLdb из вложенного объекта JSON

У меня есть тема, по которой я отправляю json в следующем формате:

 {
  "schema": {
   "type": "string",
   "optional": true
  },
  "payload": “CustomerData{version='1', customerId=‘76813432’,      phone=‘76813432’}”
} 

и я хотел бы создать поток с помощью customerId и phone, но я не уверен, как определить поток с точки зрения вложенного объекта json. (отредактировано)

CREATE  STREAM customer (
    payload.version VARCHAR,
    payload.customerId VARCHAR,
    payload.phone VARCHAR
  ) WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
  );    

Было бы что-то подобное? Как отменить ссылку на вложенный объект при определении поля потоков?

На самом деле приведенное выше не работает для определений полей, в нем говорится:

Caused by: line 2:12: 
extraneous input '.' expecting {'EMIT', 'CHANGES',
'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY',

person Roger Alkins    schedule 15.09.2020    source источник


Ответы (1)


Применение функции extractjsonfield

Существует функция ksqlDB под названием extractjsonfield что вы можете использовать.

Во-первых, вам нужно извлечь поля schema и payload:

CREATE STREAM customer (
  schema VARCHAR,
  payload VARCHAR
) WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
); 

Затем вы можете выбрать вложенные поля в json:

SELECT EXTRACTJSONFIELD(payload, '$.version') AS version FROM customer;

Однако похоже, что ваши данные полезной нагрузки не имеют допустимого формата JSON.


Применение схемы STRUCT

Если вся ваша полезная нагрузка закодирована как строка JSON, это означает, что ваши данные выглядят так:

{
  "schema": {
   "type": "string",
   "optional": true
  },
  "payload": {
    "version"="1",
    "customerId"="76813432",
    "phone"="76813432"
  }
} 

вы можете определить STRUCT, как показано ниже:

CREATE STREAM customer (
  schema STRUCT<
    type VARCHAR,
    optional BOOLEAN>,
  payload STRUCT<
    version VARCHAR,
    customerId VARCHAR,
    phone VARCHAR>
) 
WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
);

и, наконец, ссылку на отдельные поля можно сделать следующим образом:

CREATE STREAM customer_analysis AS
SELECT
  payload->version as VERSION,
  payload->customerId as CUSTOMER_ID,
  payload->phone as PHONE
FROM customer
EMIT CHANGES;
person mike    schedule 15.09.2020
comment
Хорошо, вы можете уточнить, как EXTRACTJSONFIELD (message, '$ .payload.version') будет применяться в определении потока - person Roger Alkins; 15.09.2020
comment
Я хотел бы увидеть, как это сделать с помощью как структуры, так и подхода EXTRACTJSONFIELD. - person Roger Alkins; 15.09.2020
comment
А также, если в полезной нагрузке снова будет подобъект, можно ли снова применить STRUCT? - person Roger Alkins; 15.09.2020
comment
Итак, чтобы прояснить, мне нужно знать, является ли, например, сообщение встроенным объектом во время команды CREATE STREAM? и как именно в этом случае будет установлено имя поля потока? - person Roger Alkins; 15.09.2020
comment
Также проверьте, эквивалентны ли STRING и VARCHAR этому определению, как указано в вашем ответе? - person Roger Alkins; 15.09.2020
comment
Привет, @RogerAlkins, извините, что потребовалось время, чтобы ответить, и надеюсь, что вы уже решили свою проблему. Если вы хотите взглянуть на часть с использованием STRUCT, я только что ее протестировал, и она работает нормально. Вы также можете иметь подобъекты внутри SRUCT (что означает, что у вас может быть СТРУКТУРА СТРУКТОВ). - person mike; 29.09.2020