Проблеми с кодера с Apache Beam и CombineFn

Ние изграждаме тръбопровод, използвайки Apache Beam и DirectRunner като бегач. В момента се опитваме да създадем прост тръбопровод, чрез който:

  1. Изтеглете данни от Google Cloud Pub/Sub (в момента използвате емулатора за локално изпълнение)
  2. Десериализирайте в Java обект
  3. Прозоречни събития с фиксирани прозорци от 1 минута
  4. Комбинирайте тези прозорци с помощта на персонализиран CombineFn, който ги трансформира от събития в списък със събития.

Код на тръбопровода:

pipeline
.apply(PubsubIO.<String>read().topic(options.getTopic()).withCoder(StringUtf8Coder.of()))

.apply("ParseEvent", ParDo.of(new ParseEventFn()))

.apply("WindowOneMinute",Window.<Event>into(FixedWindows.of(Duration.standardMinutes(1))))              

.apply("CombineEvents", Combine.globally(new CombineEventsFn()));

Функция ParseEvent:

    static class ParseEventFn extends DoFn<String, Event> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            c.output(gson.fromJson(json, Event.class));
        }
    }

Функция CombineEvents:

public static class CombineEventsFn extends CombineFn<Event, CombineEventsFn.Accum, EventListWrapper> {
        public static class Accum {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }

        @Override
        public Accum createAccumulator() {
            return new Accum();
        }

        @Override
        public Accum addInput(Accum accumulator, Event event) {
            accumulator.eventListWrapper.events.add(event);
            return accumulator;
        }

        @Override
        public Accum mergeAccumulators(Iterable<Accum> accumulators) {
            Accum merged = createAccumulator();
            for (Accum accum : accumulators) {
                merged.eventListWrapper.events.addAll(accum.eventListWrapper.events);
            }
            return merged;
        }

        @Override
        public EventListWrapper extractOutput(Accum accumulator) {
            return accumulator.eventListWrapper;
        }

    }

Когато се опитваме да стартираме това локално с помощта на Maven и DirectRunner, получаваме следната грешка:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Unable to return a default Coder for CombineEvents/Combine.perKey(CombineEvents)/Combine.GroupedValues/ParDo(Anonymous).out [PCollection]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Unable to provide a default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder based on value with class java.lang.Object: No CoderFactory has been registered for the class.
  Building a Coder from the @DefaultCoder annotation failed: Class java.lang.Object does not have a @DefaultCoder annotation.
  Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.lang.Object: org.apache.beam.sdk.coders.protobuf.ProtoCoder$2@6e610150 could not provide a Coder for type java.lang.Object: Cannot provide ProtoCoder because java.lang.Object is not a subclass of com.google.protobuf.Message; org.apache.beam.sdk.coders.SerializableCoder$1@7adc59c8 could not provide a Coder for type java.lang.Object: Cannot provide SerializableCoder because java.lang.Object does not implement Serializable.
  Building a Coder from the @DefaultCoder annotation failed: Class org.apache.beam.sdk.values.KV does not have a @DefaultCoder annotation.
  Using the default output Coder from the producing PTransform failed: Unable to provide a default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder based on value with class java.lang.Object: No CoderFactory has been registered for the class.
  Building a Coder from the @DefaultCoder annotation failed: Class java.lang.Object does not have a @DefaultCoder annotation.
  Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.lang.Object: org.apache.beam.sdk.coders.protobuf.ProtoCoder$2@6e610150 could not provide a Coder for type java.lang.Object: Cannot provide ProtoCoder because java.lang.Object is not a subclass of com.google.protobuf.Message; org.apache.beam.sdk.coders.SerializableCoder$1@7adc59c8 could not provide a Coder for type java.lang.Object: Cannot provide SerializableCoder because java.lang.Object does not implement Serializable.
  Building a Coder from the @DefaultCoder annotation failed: Class org.apache.beam.sdk.values.KV does not have a @DefaultCoder annotation.
    at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
    at org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
    at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
    at org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:143)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:418)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:334)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
    at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1459)
    at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1336)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:420)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:350)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
at ***************************.main(***************.java:231)
... 6 more

Извинения за огромното изхвърляне на код - исках да осигуря целия контекст.

Любопитен съм защо се оплаква, че няма кодер по подразбиране както за java.lang.Object, така и за org.apache.beam.sdk.values.KV<K, OutputT> - доколкото мога да разбера, нашият конвейер променя типовете между String, Event и EventListWrapper - последните два класа имат своите кодери по подразбиране, зададени на класа себе си (AvroCoder и в двата случая).

Грешката възниква на реда, където прилагаме CombineFn - може да потвърди, че без тази трансформация конвейерът работи.

Подозирам, че сме настроили комбинираната трансформация по някакъв начин неправилно, но досега не сме намерили нищо в документацията на Beam, което да ни насочи в правилната посока.

Всяко прозрение ще бъде оценено - благодаря предварително!


person Chris Staikos    schedule 16.05.2017    source източник
comment
Коя версия на Beam използвате?   -  person Ben Chambers    schedule 16.05.2017
comment
Ами сега, вероятно трябваше да включи това: 0.6.0   -  person Chris Staikos    schedule 16.05.2017
comment
регистриран ли е кодерът Event.class?   -  person CasualT    schedule 16.05.2017
comment
Вярвам, че е така - имам @DefaultCoder(AvroCoder.class) на тази дефиниция на клас   -  person Chris Staikos    schedule 16.05.2017
comment
Класът CombineEventsFn.Accum има ли регистриран кодер?   -  person Ben Chambers    schedule 16.05.2017
comment
Това не е така! Тъкмо щях да публикувам, че това е проблемът :) Добавих @DefaultCoder(AvroCoder.class) към Accum и това го поправи. Чувствайте се свободни да отговорите и аз ще приема - благодаря!   -  person Chris Staikos    schedule 16.05.2017


Отговори (2)


Вероятната причина да виждате java.lang.Object е, че Beam се опитва да изведе кодер за неразрешена променлива тип, която ще бъде разрешена до Object. Това може да е грешка в начина, по който кодерът прави изводи в рамките на Combine.

Отделно бих очаквал класът Accum също да причини неуспех на извода на кодера. Можете да замените getAccumulatorCoder във вашия CombineFn, за да го предоставите съвсем директно.

person Kenn Knowles    schedule 16.05.2017
comment
Благодаря! Добавих @DefaultCoder(AvroCoder.class) към дефиницията на класа за CombineEventsFn.AvroCoder, което вярвам, че постигна точно това :) - person Chris Staikos; 16.05.2017

Проверихте ли дали добавянето на Serializable към вашия Accumulator работи директно?

Така че добавете "implements Serializable" към клас Accum ...

public static class Accum implements Serializable {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }
person Nishant    schedule 06.03.2019
comment
Можете ли да обясните допълнително как това разрешава даденото изключение? Вече има съществуващ и гласуван отговор, който има друг начин, така че определено трябва да обясните кода си - person Nico Haase; 06.03.2019
comment
Вместо изрично да се посочи кодер за акумулатора, може да се посочи класът Accum като сериализуем (ако приемем, че той наистина е сериализуем). Горната грешка е, защото кодерът за акумулатора не е посочен. Подходът по-горе основно изисква кодиращият да презапише спецификацията на кодиращия по подразбиране, което е добре, ако трябва да бъде направено така. Въпреки това, ако приемем, че класът EventListWrapper може да бъде сериализиран, указването на implements Serializable го прави по-лесно - person Nishant; 08.03.2019
comment
Моля, добавете цялата такава информация към самия отговор, а не към раздела за коментари - person Nico Haase; 08.03.2019