как производитель общается с реестром и что отправляет в реестр

Я пытаюсь понять это, читая документацию, но, возможно, потому, что я не продвинутый программист, я не совсем понимаю это.

Я в документации и, например, в этом примере: https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-serializer

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");

Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);

String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
  .setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
  .setF1("value1").setF2(otherRecord).build();

ProducerRecord<String, MyRecord> record
      = new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();

Я вижу здесь, что вы определяете URL-адрес реестра схемы, а затем каким-то образом производитель узнает, что он отправит контакт в реестр, чтобы предоставить некоторые метаданные о сообщениях в реестр.

Теперь я хотел бы лучше понять, как это на самом деле работает и чем обмениваются производитель и реестр (или кафка, который контактирует с реестром)?

В любом случае мой вопрос: представьте, что у меня есть запись в формате protobuf. Закладываю тот протобуф в кафку в определенной теме. теперь я хочу активировать реестр схемы, чтобы производитель просто отправил определение прототипа в реестр схемы? неужели производитель просто получает определение метаданных непосредственно из записи? будет ли он пытаться обновить любое новое сообщение в очереди? не увеличит ли это немного задержку при отправке данных в кафку?

Извините, если это все очень простые вопросы, но я просто пытаюсь получить более широкую картину, и мне не хватает этого покоя.

Спасибо за любую информацию, извините, если это уже ясно из документации. (Мне нужно это, чтобы я мог использовать ksql для десериализации своих сообщений в kafka)

с уважением,


person Miguel Costa    schedule 22.09.2020    source источник


Ответы (1)


может ли производитель просто отправить определение прототипа в реестр схемы

Это делает сериализатор, а не напрямую производитель.

MyRecord сериализуется в двоичный файл, схема отправляется по протоколу HTTP в реестр, который возвращает идентификатор, затем отправленное сообщение содержит 0x0 + ID + binary-protobuf-value

Исходный код здесь

будет ли он пытаться обновить любое новое сообщение в очереди?

Схема отправляется до отправки любых сообщений. Существующие сообщения остаются нетронутыми

не увеличит ли это немного задержку при отправке данных в кафку?

Только для первого сообщения, так как схема кэшируется

person OneCricketeer    schedule 28.09.2020