Контекст
Я закодировал пару небольших коннекторов Kafka Connect. Один, который просто генерирует случайные данные каждую секунду, а другой, регистрирует их в консоли. Они интегрированы с реестром схем, поэтому данные сериализован с помощью Avro.
Я развернул их в локальной среде Kafka, используя образ Docker fast-data-dev, предоставленный Landoop а>
Базовая настройка работает и каждую секунду выдает сообщение, которое регистрируется.
Однако я хочу изменить тему название стратегии. По умолчанию генерируются две темы:
${topic}-key
${topic}-value
В соответствии с моим вариантом использования мне нужно будет сгенерировать события с разными схемами, которые будут относиться к одной и той же теме. Поэтому мне нужны следующие имена субъектов:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
Согласно документам мои потребности вписываются в TopicRecordNameStrategy
Что я пробовал
Создаю объект avroData
для отправки значений для подключения:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
и использовать его впоследствии для создания объектов ответа SourceRecord
В документации указано, что для использования реестра схем в Kafka Connect Мне нужно установить некоторые свойства в конфигурации коннектора. Поэтому при создании я их добавляю:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Проблема
Соединитель, кажется, игнорирует эти свойства и продолжает использовать старые темы ${topic}-key
и ${topic}-value
.
Вопрос
Kafka Connect должен поддерживать различные тематические стратегии. Мне удалось обойти проблему, написав свою собственную версию _ 11_ и жестко указать, что выбранная стратегия - именно та, которая мне нужна. Однако это не выглядит хорошим подходом, а также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector. Я продублировал тему, так что есть версия со старым названием (${topic}-key
), и она работает
Как правильно указать стратегию объекта для Kafka Connect?