Проблема с шаблоном Flink с Arralist в коде предупреждения?

Я подписался на этот пример и реализован с помощью kafka json с теми же примерами данных.

выборочные данные потребителей {"temperature" : 28,"machineName":"xyz"}

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
        .flatSelect(new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
            private static final long serialVersionUID = 1L;


        @Override
        public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
            new Alert("Temperature Rise Detected:" + ((TemperatureEvent) event.get("first")).getTemperature()
                    + " on machine name:" + ((MonitoringEvent) event.get("first")).getMachineName());

        }

Теперь у меня проблема с приведением ArrayList

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at Test.KafkaApp.main(KafkaApp.java:61)

Вызвано: java.lang.ClassCastException: java.util.ArrayList не может быть приведен к Test.TemperatureEvent в Test.KafkaApp$2.flatSelect(KafkaApp.java:53) в org.apache.flink.cep.operator.FlatSelectCepOperator.processMatchedSequences( FlatSelectCepOperator.java:66) в org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:382) в org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:198) в org. .apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) в org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) в org.apache.flink .streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) в org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) в java.lang.Thread.run(неизвестный источник)


person Nathon    schedule 27.02.2019    source источник
comment
Не могли бы вы показать коды inputStream?   -  person Jiayi Liao    schedule 28.02.2019
comment
DataStream‹TemperatureEvent› inputEventStream = env.addSource( new FlinkKafkaConsumer‹TemperatureEvent›(demo, new EventDeserializationSchema(), properties));   -  person Nathon    schedule 28.02.2019


Ответы (1)


Ваш код содержит две проблемы:

  • Прежде всего flatSelect получает Map<String, List<TemperatureEvent>>. Это означает, что вы получаете потенциально несколько TemperatureEvents на шаблон. Таким образом, вы должны выбрать, какой из них вы хотите.
  • Вы не добавляете Alerts к Collector<Alert>. Функция плоской карты не возвращает значения, а выводит их через Collector<Alert>

Без компиляции, я думаю, это должно помочь

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
    .flatSelect(
    new PatternFlatSelectFunction<TemperatureEvent, Alert>() {
            private static final long serialVersionUID = 1L;

        @Override
        public void flatSelect(Map<String, List<TemperatureEvent>> event, Collector<Alert> out) throws Exception {
            TemperatureEvent temperatureEvent = event.get("first").get(0);
            out.collect(new Alert("Temperature Rise Detected:" + temperatureEvent.getTemperature() + " on machine name:" + temperatureEvent.getMachineName()));
        }
       });

Кстати, связанный код из репозитория O'Reilly не будет компилироваться с помощью Flink. PatternSelectFunction имеет неправильную подпись.

person Till Rohrmann    schedule 28.02.2019
comment
но проблема с кастингом - person Nathon; 01.03.2019
comment
Вызвано: java.lang.ClassCastException: java.util.ArrayList не может быть приведен к Test.TemperatureEvent в Test.KafkaApp$2.flatSelect(KafkaApp.java:53) в org.apache.flink.cep.operator.FlatSelectCepOperator.processMatchedSequences( FlatSelectCepOperator.java:66) в org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:382) в org.apache.flink.cep.operator.Abs - person Nathon; 01.03.2019
comment
Я не делаю приведение в своем коде. Какой код вы пробовали? - person Till Rohrmann; 01.03.2019
comment
DataStream‹Alert› patternStream = CEP.pattern(inputEventStream, warningPattern) .select(new PatternSelectFunction‹TemperatureEvent, Alert›() { private static final long serialVersionUID = 1L; public Alert select(Map‹String, TemperatureEvent› event) throws Exception { вернуть новое предупреждение (обнаружено повышение температуры: + ((TemperatureEvent) event.get(first)).getTemperature() + по имени компьютера: + ((MonitoringEvent) event.get(first)).getMachineName()); } - person Nathon; 01.03.2019
comment
Как я писал выше, подпись PatternSelectFunction<TemperatureEvent, Alert>#select это Alert select(Map<String, List<TemperatureEvent> events), а не Alert select(Map<String, TemperatureEvent> event). Также убедитесь, что вы используете одну и ту же версию Flink в качестве зависимости и для кластера. - person Till Rohrmann; 01.03.2019
comment
Да, то же самое, я изменил подпись, что бы вы ни предлагали, но ее получение ArrayList не может быть приведено к проблеме. - person Nathon; 01.03.2019
comment
@Override public Alert select(Map‹String, List‹TemperatureEvent›› event) throws Exception { return new Alert(Обнаружено повышение температуры: + ((TemperatureEvent) event.get(first)).getTemperature() + для имени машины: + ((MonitoringEvent) event.get(first)).getMachineName()); } - person Nathon; 01.03.2019
comment
Не могли бы вы поделиться ссылкой на репозиторий Github с вашим кодом, чтобы я мог взглянуть на проблему? - person Till Rohrmann; 01.03.2019
comment
Конечно. Спасибо - person Nathon; 01.03.2019
comment
Код, которым вы поделились со мной, компилируется и работает. Не уверен, понимаю ли я, в чем именно заключается ваша проблема. - person Till Rohrmann; 01.03.2019
comment
Он не получает никакого предупреждающего сообщения - person Nathon; 01.03.2019
comment
Вы проверили свой ввод? Поставлены ли в очередь темы Kafka требуемые сообщения? - person Till Rohrmann; 01.03.2019
comment
данные {температура: 28,machineName:xyz} - person Nathon; 01.03.2019
comment
Наконец это работает. Проблема с Eventdeserialization, выводящим индекс массива за границы - person Nathon; 01.03.2019