Kafka Connect не работает с предметными стратегиями

Контекст

Я закодировал пару небольших коннекторов Kafka Connect. Один, который просто генерирует случайные данные каждую секунду, а другой, регистрирует их в консоли. Они интегрированы с реестром схем, поэтому данные сериализован с помощью Avro.

Я развернул их в локальной среде Kafka, используя образ Docker fast-data-dev, предоставленный Landoop

Базовая настройка работает и каждую секунду выдает сообщение, которое регистрируется.

Однако я хочу изменить тему название стратегии. По умолчанию генерируются две темы:

  • ${topic}-key
  • ${topic}-value

В соответствии с моим вариантом использования мне нужно будет сгенерировать события с разными схемами, которые будут относиться к одной и той же теме. Поэтому мне нужны следующие имена субъектов:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

Согласно документам мои потребности вписываются в TopicRecordNameStrategy

Что я пробовал

Создаю объект avroData для отправки значений для подключения:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

и использовать его впоследствии для создания объектов ответа SourceRecord

В документации указано, что для использования реестра схем в Kafka Connect Мне нужно установить некоторые свойства в конфигурации коннектора. Поэтому при создании я их добавляю:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Проблема

Соединитель, кажется, игнорирует эти свойства и продолжает использовать старые темы ${topic}-key и ${topic}-value.

Вопрос

Kafka Connect должен поддерживать различные тематические стратегии. Мне удалось обойти проблему, написав свою собственную версию _ 11_ и жестко указать, что выбранная стратегия - именно та, которая мне нужна. Однако это не выглядит хорошим подходом, а также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector. Я продублировал тему, так что есть версия со старым названием (${topic}-key), и она работает

Как правильно указать стратегию объекта для Kafka Connect?


person Pelocho    schedule 28.11.2018    source источник


Ответы (1)


Вам не хватает префиксов key.converter и value.converter, чтобы конфигурация передавалась в конвтер. Так что вместо:

key.subject.name.strategy
value.subject.name.strategy

ты хочешь:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

Источник https://docs.confluent.io/current/connect/managing/configuring.html:

Чтобы передать параметры конфигурации в преобразователи ключей и значений, поставьте перед ними префикс key.converter. или value.converter., как в рабочей конфигурации при определении преобразователей по умолчанию. Обратите внимание, что они используются только в том случае, если соответствующая конфигурация преобразователя указана в свойствах key.converter или value.converter.

person Robin Moffatt    schedule 28.11.2018
comment
Ты прав. Я не нашел ничего, связанного с этим, в документации (может, я не достаточно покопался). У вас есть ссылка на документацию, в которой говорится об этом? Это завершит ответ - person Pelocho; 28.11.2018
comment
Отредактированный вопрос, чтобы включить ссылку - person Robin Moffatt; 28.11.2018
comment
Для контекста, это не работало до Confluent 4.1.3, AFAIK. github.com/confluentinc/schema-registry/pull/801 - person OneCricketeer; 19.09.2019