Соединитель Kafka-Connect JDBC: крошечное отображение в логическое значение

У меня есть задание Kafka-Connect, настроенное на периодический запрос таблицы MySQL и размещение сообщений в очереди. Структура этих сообщений определяется с помощью схемы Avro. У меня проблема с отображением для одного из моих столбцов.

Столбец определен как tinyint(1) в моей схеме MySQL, и я пытаюсь сопоставить его с логическим полем в моем объекте avro.

введите здесь описание изображения

{ "name": "is_active", "type": "boolean" }

Задания kafka-connect выполняются, и сообщения помещаются в очередь, но когда мое приложение, которое читает из очереди, пытается десериализовать сообщения, я получаю следующую ошибку:

org.apache.avro.AvroTypeException: Found int, expecting boolean

Я надеялся, что значение 1 или 0 может быть автоматически сопоставлено с логическим значением, но, похоже, это не так.

Я также пытался настроить свою работу для использования преобразования «Cast», но это, похоже, вызвало проблемы с другими полями в сообщении.

"transforms": "Cast", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "is_active:boolean"

Возможно ли то, что я пытаюсь сделать, или мне придется изменить свое приложение для работы со значением int?

Вот моя полная конфигурация (я удалил некоторые другие нерелевантные поля)

Конфигурация задания Kafka Connect

{ "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "bulk", "topic.prefix": "my_topic-name", "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "query": "select is_active from my_table", "poll.interval.ms": "30000", "transforms": "SetSchemaMetadata", "name": "job_name", "connection.url": "connectiondetailshere", "transforms.SetSchemaMetadata.schema.name": "com.my.model.name" }

Схема АВРО

{ "type": "record", "name": "name", "namespace": "com.my.model", "fields": [
{ "name": "is_active", "type": "long" } ], "connect.name": "com.my.model.name" }


person Paddyd    schedule 02.07.2019    source источник
comment
что за ошибка с кастингом?   -  person HungUnicorn    schedule 02.07.2019
comment
У меня нет точной ошибки прямо сейчас, но это вызывало проблемы с типами других полей в моей схеме, которые, казалось, преобразовывались, хотя я не указывал их внутри transforms.Cast.spec   -  person Paddyd    schedule 02.07.2019
comment
звучит для меня зарезервированными вопросами слова. у вас есть query как имя поля? затем поместите `` для покрытия, например `query`. также, если у вас есть большой ksql, было бы просто удалить некоторые части ksql и протестировать с небольшими частями, чтобы узнать, какая часть работает, например, разработчик базы данных отлаживает запрос   -  person HungUnicorn    schedule 02.07.2019
comment
Нет, у меня нет запроса в качестве имени поля. Поле is_active — это новое, которое я добавляю, поэтому я знаю, что все остальное работает. Проблема связана с этим новым полем   -  person Paddyd    schedule 02.07.2019


Ответы (1)


Вы можете сделать это либо с помощью пользовательского преобразования (идеальный вариант его использования) или напишите для этого простое потоковое приложение, например, на KSQL:

CREATE STREAM my_topic AS 
  SELECT COL1, COL2, …
         CASE WHEN is_active=1 THEN TRUE ELSE FALSE END AS is_active_bln
  FROM my_source_connect_topic;
ksql> describe my_topic;

Name                 : my_topic
 Field         | Type
-----------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 COL1          | INTEGER
 COL1          | VARCHAR
 IS_ACTIVE_BLN | BOOLEAN
----------------------------------------
person Robin Moffatt    schedule 02.07.2019
comment
из документа я так понимаю, что трансформацию можно было сделать через каст, иначе что бы сделать не получается? - person HungUnicorn; 03.07.2019