Я пытаюсь понять это, читая документацию, но, возможно, потому, что я не продвинутый программист, я не совсем понимаю это.
Я в документации и, например, в этом примере: https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-serializer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);
String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
.setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
.setF1("value1").setF2(otherRecord).build();
ProducerRecord<String, MyRecord> record
= new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
Я вижу здесь, что вы определяете URL-адрес реестра схемы, а затем каким-то образом производитель узнает, что он отправит контакт в реестр, чтобы предоставить некоторые метаданные о сообщениях в реестр.
Теперь я хотел бы лучше понять, как это на самом деле работает и чем обмениваются производитель и реестр (или кафка, который контактирует с реестром)?
В любом случае мой вопрос: представьте, что у меня есть запись в формате protobuf. Закладываю тот протобуф в кафку в определенной теме. теперь я хочу активировать реестр схемы, чтобы производитель просто отправил определение прототипа в реестр схемы? неужели производитель просто получает определение метаданных непосредственно из записи? будет ли он пытаться обновить любое новое сообщение в очереди? не увеличит ли это немного задержку при отправке данных в кафку?
Извините, если это все очень простые вопросы, но я просто пытаюсь получить более широкую картину, и мне не хватает этого покоя.
Спасибо за любую информацию, извините, если это уже ясно из документации. (Мне нужно это, чтобы я мог использовать ksql для десериализации своих сообщений в kafka)
с уважением,