Kafka Connect с пользовательской временной меткой.extractor

У меня возникла проблема с добавлением jar в путь класса Kafka connect при попытке прочитать сообщения от Kafka до S3.

Цель состоит в том, чтобы писать сообщения в разделах на основе метки времени, которая является частью ключа в сообщении Kafka.

Короче говоря, я должен предоставить собственный экстрактор временных меток. Следуя документации здесь создал класс, реализующий интерфейс TimestampExtractor, и добавил расположение JAR к свойству plugin.path.

Проблема в том, что когда я запускаю соединение, класс не найден. Каким-то образом банка не находится в пути к классам, и я получаю

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor

Дополнительные данные:

версия: Confluent 4.0.0

connect: подключение автономно

Стартовая команда:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

Ценю любую помощь.


person Arkadiy Verman    schedule 18.12.2017    source источник


Ответы (1)


Чтобы сделать пользовательские классы извлечения меток времени доступными для вашего соединителя S3, вам потребуется следующее:

  • Добавьте банку со своими пользовательскими классами вместе с другими зависимостями коннектора. Пример:

    Сохраните под ./share/java/kafka-connect-s3, если вы хотите, чтобы это было доступно только в соединителе S3, или в ./share/java/kafka-connect-storage-common, чтобы сделать его доступным для всех соединителей приемника хранилища (в настоящее время соединители S3 и HDFS).

  • Убедитесь, что ваш пользовательский класс реализует интерфейс io.confluent.connect.storage.partitioner.TimestampExtractor.
  • Используйте полное имя класса при установке свойства timestamp.extractor в конфигурации вашего соединителя и, конечно же, убедитесь, что оно соответствует пакету, который вы определили и упаковали в банку. Например:

    timestamp.extractor=me.connectors.MyTimestampExtractor

Наконец, вы должны следовать аналогичному процессу, чтобы сделать настраиваемый разделитель доступным для вашего соединителя.

person Konstantine Karantasis    schedule 18.12.2017
comment
Спасибо за это, однако, можете ли вы пояснить, почему plugin.path не работает для загрузки пользовательского экстрактора или разделителя? - person OneCricketeer; 20.12.2017
comment
Когда вы говорите, что он не работает, вы имеете в виду, что он не загружает его из отдельного каталога в plugin.path? Это потому, что ни класс разделителя, ни класс timestamp.extractor не считаются подключаемыми модулями Connect (по крайней мере, на данный момент). Плагины Connect: Connectors, Transforms, Converters. - person Konstantine Karantasis; 21.12.2017