Как предотвратить вставку нежелательных атрибутов в сообщение Kafka с помощью приемника Kafka connect?

Я публикую сообщения в своей теме Kafka, используя приложение производителя Java, используя приведенный ниже код.

String mySchema = "{"type": "record","name": "MyData","namespace": "com.qwe.rty","doc": "MyData Schema","fields": [{"name": "f1","type": ["null", "string"],"default" : null}, {"name": "f2","type": ["null", "string"],"default" : null}, {"name": "f3","type": ["null", "string"],"default" : null}]}";


Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

props.put("schema.registry.url", "http://192.168.0.1:8081");



KafkaProducer producer = new KafkaProducer(props);

Schema.Parser parser = new Schema.Parser();

Schema schema = parser.parse(mySchema);



GenericRecord avroRecord = new GenericData.Record(schema);

avroRecord.put("f1", "data1");

avroRecord.put("f2", "data2");

avroRecord.put("f3", "data3");



ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("my_topic_1",avroRecord);



try {

    System.out.println("send from producer.serialized.avro.Sender6n");

    producer.send(record);

} catch (Exception e) {

    e.printStackTrace();

}



try{

    System.out.println("flush from producer.serialized.avro.Sender6n");

    producer.flush();

}catch(Exception e){

    e.printStackTrace();

}

Сообщения в Avro. Я использую приемник Kafka connect jdbc, чтобы вставить его в мою таблицу Oracle, используя приведенную ниже конфигурацию;

name=jdbc-sink-avro

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=my_topic_1

connection.url=jdbc:oracle:thin:@192.168.0.1:1521:user01

connection.user=user_01

connection.password=user_01

auto.create=true

table.name.format=my_table_1

key.converter=io.confluent.connect.avro.AvroConverter

key.converter.schema.registry.url=http://192.168.0.1:8081

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://192.168.0.1:8081

producer.retries=1

Это прекрасно работает. Но я не хочу, чтобы f3 вставлялся в мою таблицу, а это значит, что мне просто нужна таблица с двумя столбцами. Другими словами, я хочу предотвратить вставку f3. Как я могу это сделать?


person Alfred    schedule 29.01.2018    source источник


Ответы (1)


Используйте функцию Kafka Connect Single Message Transform (SMT). В частности, ReplaceField и blacklist.

Вы можете увидеть пример здесь ("Маскировка полей и белые/черные списки").

person Robin Moffatt    schedule 29.01.2018
comment
Спасибо @Robin, есть ли способ переименовать поле, используя эту функцию? - person Alfred; 29.01.2018
comment
Да, ознакомьтесь с документами для org.apache.kafka.connect.transforms.ReplaceFieldrenames должно делать то, что вы хотите. - person Robin Moffatt; 29.01.2018