Почему происходит сбой ksqldb-server при обновлении с v0.6.0 до v0.8.1?

У меня есть существующий ksqldb-сервер, который я хочу обновить с v0.6.0 с подключением как отдельный узел до v0.8.1 со встроенным подключением. Однако ksqldb-server аварийно завершает работу после запуска со следующим журналом:

ksqldb-server      | [2020-04-07 07:24:37,266] ERROR Error during restore (io.confluent.ksql.rest.server.computation.CommandRunner:191)
ksqldb-server      | org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
ksqldb-server      | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      | Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      |    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
ksqldb-server      |    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1206)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:504)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3266)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.InternalTopicSerdes$InternalTopicDeserializer.deserialize(InternalTopicSerdes.java:59)
ksqldb-server      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1309)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1540)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1376)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
ksqldb-server      |    at io.confluent.ksql.rest.server.CommandTopic.getRestoreCommands(CommandTopic.java:88)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandStore.getRestoreCommands(CommandStore.java:211)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandRunner.processPriorCommands(CommandRunner.java:155)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.initialize(KsqlRestApplication.java:424)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startKsql(KsqlRestApplication.java:354)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startAsync(KsqlRestApplication.java:338)
ksqldb-server      |    at io.confluent.ksql.rest.server.ExecutableServer.startAsync(ExecutableServer.java:47)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.doAction(MultiExecutable.java:63)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.startAsync(Mult iExecutable.java:42)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:75)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:61)
ksqldb-server      | [2020-04-07 07:24:37,286] INFO Server shutting down (io.confluent.ksql.rest.server.KsqlServerMain:79)

Ключевое сообщение здесь: Нераспознанное поле «useOffsetAsQueryID» (класс io.confluent.ksql.rest.server.computation.Command), не отмеченное как игнорируемое (4 известных свойства: «план», «исходные свойства», «потоки свойств. "," заявление "])

Единственный ресурс, который я смог найти, - это эта функция, но она уже была в v0.6.0: https://github.com/confluentinc/ksql/pull/3343

Любая идея? Спасибо за вашу помощь.


person reneveyj    schedule 07.04.2020    source источник


Ответы (1)


Были критические изменения с 0.6 на 0.8.1. Вы видите ошибку, потому что KSQL 0.8.1 не может использовать потоки 0.6, прочитанные из раздела команд. Чтобы исправить это, следуйте этим инструкциям по обновлению: https://github.com/confluentinc/ksql/blob/master/docs/operate-and-deploy/installation/upgrading.md

Было бы проще, если бы вы могли вернуться к своей версии 0.6, чтобы получить все потоки / таблицы / типы из KSQL, а затем воссоздать их в 0.8.1.

person Sergio    schedule 07.04.2020
comment
не могли бы вы предоставить обновленную ссылку в своем ответе. существующий говорит 404. - person Nitesh Ratnaparkhe; 13.05.2020