Как исправить поиск кэшированной схемы реестра, вызывающий ужасную производительность

Изменить: я обнаружил этот другой вопрос несколько лет назад (Как заполнить кеш в CachedSchemaRegistryClient без вызова для регистрации новой схемы?). В нем упоминается, что CachedSchemaRegistryClient должен зарегистрировать схему в реальном реестре, чтобы сделать ее кэшированной, и еще не было решения, позволяющего обойти это. Поэтому оставляю свой вопрос здесь, но хотел, чтобы об этом тоже знали.

Я работаю над программой, которая извлекает массив байтов из kafka, расшифровывает его (так что он безопасен при использовании kafka), преобразует байты в строку, строку json в объект json, ищет схему из реестра схем ( с использованием CachedSchemaRegistryClient), преобразование байтов json в универсальную запись с использованием схемы из извлеченной схемы из метаданных реестра, а затем сериализация этой универсальной записи в байты avro.

После выполнения некоторых тестов кажется, что CachedSchemaRegistyClient является основным фактором снижения производительности. Но из того, что я могу сказать, это лучший способ получить метаданные схемы. Я что-то плохо реализовал или есть другой способ сделать это, который работает с моим вариантом использования?

Вот код того, что обрабатывает все после расшифровки:

package org.apache.flink;

import avro.fullNested.FinalMessage;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import serializers.AvroFinishedMessageSerializer;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class JsonToAvroBytesParser implements FlatMapFunction<String, byte[]> {

    private transient CachedSchemaRegistryClient schemaRegistryClient;
    private transient AvroFinishedMessageSerializer avroFinishedMessageSerializer;
    private String schemaUrl;
    private Integer identityMaxCount;

    public JsonToAvroBytesParser(String passedSchemaUrl, int passedImc){
        schemaUrl = passedSchemaUrl;
        identityMaxCount = passedImc;
    }

    private void ensureInitialized() {
        if (schemaUrl.equals("")) {
            schemaUrl = "https://myschemaurl.com/";
        }
        if(identityMaxCount == null){
            identityMaxCount = 5;
        }
        if(schemaRegistryClient == null){
            schemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, identityMaxCount);
        }
        if(avroFinalMessageSerializer == null){
            avroFinalMessageSerializer = new AvroFinalMessageSerializer(FinalMessage.class);
        }
    }

    @Override
    public void flatMap(String s, Collector<byte[]> collector) throws Exception {

        ensureInitialized();

        Object obj = new JSONParser().parse(s);
        JSONObject jsonObject = (JSONObject) obj;

        try {
            String headers = jsonObject.get("headers").toString();
            JSONObject body = (JSONObject) jsonObject.get("requestBody");
            if(headers != null && body != null){
                String kafkaTopicFromHeaders = "hard_coded_name-value";
                //NOTE: this schema lookup has serious performance issues.
                SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicFromHeaders);
                //TODO: need to implement recovery method if schema cannot be reached.

                JsonAvroConverter converter = new JsonAvroConverter();
                GenericRecord specificRecord = converter.convertToGenericDataRecord(body.toJSONString().getBytes(), new Schema.Parser().parse(schemaMetadata.getSchema()));
                byte[] bytesToReturn = avroFinishedMessageSerializer.serializeWithSchemaId(schemaMetadata, specificRecord);

                collector.collect(bytesToReturn);
            }
            else {
                System.out.println("json is incorrect.");
            }
        } catch (Exception e){
            System.out.println("json conversion exception caught");
        }
    }
}

Спасибо за любую помощь заранее!


person Jicaar    schedule 26.06.2019    source источник


Ответы (1)


Похоже, что метод getLatestSchemaMetadata не использует кеш. Если вы хотите, чтобы ваши вызовы использовали кеш для повышения производительности, возможно, вы можете реорганизовать свою программу, чтобы использовать один из других методов, которые используют кеш, возможно, схему поиска по идентификатору или зарегистрировать схему по имени со строкой определения.

У меня возникли проблемы с поиском документации по Java (или Python, или C++), подтверждающей, что именно так работает SchemaRegistry (попробовал здесь). Но в документах .Net говорится, что, по крайней мере, в этом клиентском API метод getLatest имеет вид не кэшируется.

person Ryan    schedule 05.05.2020