Scala и Avro: проблема с регистрацией схемы io.confluent

Я использую scala 2.12, и в моем build.sbt есть следующие зависимости.

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.0"

libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "3.1.1"

libraryDependencies += "io.confluent" % "common-config" % "3.1.1"

libraryDependencies += "io.confluent" % "common-utils" % "3.1.1"

libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % "3.1.1"

Благодаря этому сообществу я могу преобразовать свои необработанные данные в требуемый формат avro.

Нам нужно использовать сливающиеся библиотеки для сериализации и отправки данных в темы Kafka.

Я использую следующие свойства и запись avro.

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
    properties.put("schema.registry.url", "http://myschemahost:8081")

Просто показываю необходимый фрагмент кода для краткости.

val producer = new KafkaProducer[String, GenericData.Record](properties)
val schema = new Schema.Parser().parse(new File(schemaFileName))

var avroRecord = new GenericData.Record(schema)
// code to populate record
// check output below to see the data
logger.info(s"${avroRecord.toString}\n")

producer.send(new ProducerRecord[String, GenericData.Record](topic, avroRecord), new ProducerCallback)
producer.flush()
producer.close()

Схема и данные в соответствии с выводом.

{"name": "person","type": "record","fields": [{"name": "address","type": {"type" : "record","name" : "AddressUSRecord","fields" : [{"name": "streetaddress", "type": "string"},{"name": "city", "type":"string"}]}}]}

При публикации в Kafka я получаю следующую ошибку.

Error registering Avro schema: 
org.apache.kafka.common.errors.SerializationException:
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:877)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:839)
  1. На основе схемы и данных чего-нибудь не хватает? Моя запись верна?
  2. Кроме того, я хочу знать, как мне заполнить "avro" NULL из Scala? Ни один не работает.

Любая помощь будет оценена по достоинству. Я действительно застрял здесь.

ОБНОВЛЕНИЕ:

Спасибо @ cricket_007 за указание на проблему. Я получаю следующую ошибку:

2019-03-20 13:26:09.660 [application-akka.actor.default-dispatcher-5] INFO  i.c.k.s.KafkaAvroSerializerConfig.logAll(169) - KafkaAvroSerializerConfig values:
        schema.registry.url = [http://myhost:8081]
        max.schemas.per.subject = 1000


Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005

Однако, когда я использую тот же URL (http://myhost:8081) в моем браузере, он работает хорошо. Я могу видеть предметы и другую информацию. Но как только я использую клиент (программу Scala выше), он выходит из строя с указанной выше ошибкой.

Я только что проверил пример кода, как показано ниже, он дает ту же проблему.

val client = new OkHttpClient
    val request = new Request.Builder().url("http://myhost:8081/subjects").build()
    val output = client.newCall(request).execute().body().string()
    logger.info(s"Subjects: ${output}\n")

Я получаю отказ в соединении для URL-адреса реестра схемы.

Subjects: <HEAD><TITLE>Connection refused</TITLE></HEAD>
<BODY BGCOLOR="white" FGCOLOR="black"><H1>Connection refused</H1><HR>
<FONT FACE="Helvetica,Arial"><B>
Description: Connection refused</B></FONT>
<HR>
<!-- default "Connection refused" response (502) -->
</BODY>

Итак, хотел проверить, не упускаю ли я что-нибудь. То же самое работает, когда я запускаю его в браузере, но простой код, как показано выше, не работает.


person Mihir    schedule 15.03.2019    source источник


Ответы (1)


Это ошибка синтаксического анализа HTTP-ответа. Кажется, ваш реестр схемы возвращает не ответ JSON, а скорее некоторый HTML, начинающийся с открытого тега <.

Вы должны проверить, действительно ли реестр запущен на http://myschemahost:8081, и вы можете вручную опубликовать в нем свою схему, используя REST API, чтобы выполнять те же действия, что и сериализатор.

person OneCricketeer    schedule 16.03.2019
comment
Спасибо @ cricket_007. URL-адрес реестра схемы является удаленным, т. Е. Управляется другой командой. Я узнаю, как опубликовать схему вручную, и попробую. Спасибо еще раз. - person Mihir; 17.03.2019