XML-поток Apache Flink Kakfa

Я получаю следующую ошибку при попытке запустить приложение Flink Streaming.

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.test.SwissProt: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?)
at [Source: [B@681c6d54; line: 1, column: 12]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:261)
at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456)
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1203)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2920)
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:17)
at com.test.SwissProtDeserializationSchema.deserialize(SwissProtDeserializationSchema.scala:9)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)

Я думал в Scala, что когда вы создаете класс case, создается конструктор по умолчанию? Я не понимаю ошибки. Пожалуйста помоги!

У меня есть следующие объекты Scala:

Основной объект scala для запуска Flink Streaming

package com.test

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

object Run {

   def main(args: Array[String]): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val properties = new Properties()
     properties.setProperty("bootstrap.servers", "localhost:9092")
     properties.setProperty("group.id", "test")
     val rawStream = env.addSource(new FlinkKafkaConsumer09("XML", new SwissProtDeserializationSchema,properties))

     rawStream.print
     env.execute()
   }

 }

Класс Case, описывающий ввод

 package com.test

 case class SwissProt (name: String,
                       address: String,
                       phoneNumber: String,
                       cellPhoneNumber: String
                       ) {

 }

Наконец, класс десериализации для извлечения события Kafka в мой объект класса case

 package com.test

 import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema
 import com.fasterxml.jackson.dataformat.xml.XmlMapper

 class SwissProtDeserializationSchema extends AbstractDeserializationSchema[SwissProt]{
   private var xmlMapper: XmlMapper = null

   override def deserialize(bytes: Array[Byte]): SwissProt = {
     if (xmlMapper == null) {
       xmlMapper = new XmlMapper()
     }

     xmlMapper.readValue(bytes, classOf[SwissProt])
   }
 }

person Crackerman    schedule 03.10.2016    source источник
comment
Компилятор синтезирует фабричный метод в сопутствующем объекте, но я не думаю, что он также синтезирует конструктор по умолчанию (в смысле Java, конструктор без параметров). Похоже, что используемая вами библиотека требует соблюдения соглашения о Bean, поэтому, если вы хотите написать ее на Scala, я бы посоветовал вам создать обычный класс с vars, аннотировать их с помощью @BeanProperty (scala-lang.org/files/archive/spec/2.11 /) и добавьте конструктор по умолчанию (stackoverflow.com/ questions / 6874329 /).   -  person stefanobaghino    schedule 03.10.2016
comment
Это для информации. Вы заставили меня внимательнее изучить аннотации к файлу jackson-dataformat-xml. Оказывается, мне нужно использовать @JacksonXmlProperty для пары моих полей. Кроме того, мне нужен был дополнительный класс case для перехода к моему исходному классу case. Я отправлю последний урок, когда он меня полностью устроит. Еще раз спасибо!   -  person Crackerman    schedule 03.10.2016


Ответы (1)


Вам просто нужно зарегистрировать DefaultScalaModule. Так

    xmlMapper.registerModule(DefaultScalaModule)

The scala import :
    import com.fasterxml.jackson.module.scala.DefaultScalaModule

The maven dependency:
            <dependency>
                <groupId>com.fasterxml.jackson.module</groupId>
                <artifactId>jackson-module-scala_2.11</artifactId>
                <version>2.6.5</version>
            </dependency>
person Basanth Roy    schedule 17.01.2017