Как сохранить значения из Trident/Storm в списке (используя Java API)

Я пытаюсь создать несколько модульных тестов, чтобы убедиться, что определенные части моей топологии Trident делают то, что должны.

Я хотел бы получить все значения, полученные после запуска топологии, и поместить их в список, чтобы я мог "видеть" их и проверять условия для них.

   FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
   TridentTopology topology = new TridentTopology();
   topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    // Soo... how do I retrieve the "aggregated_foos" from here?

Я запускаю топологию как TrackedTopology (получил код из другой вопрос SO, спасибо @brianghig за вопрос и @Thomas Kielbus за ответ)

Вот как я «запускаю» топологию и как я ввожу в нее образцы значений:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));

Когда я делаю это, я вижу в сообщениях журнала, что топология работает правильно и что значения вычисляются правильно, но я хотел бы «выловить» результаты в List (или любую другую структуру, на данный момент ), так что я действительно могу поставить некоторые Asserts в свои тесты.

Я пробовал [чертову тонну] разных подходов, но ни один из них не работает.

Последней идеей было добавить болт после агрегации, чтобы он "сохранял" мои значения в списке:

Ниже вы увидите класс, который пытается просмотреть все кортежи, созданные aggregate, и поместить их в список, который я ранее инициализировал:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
    final List<AggregatedFoo> results;

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
        this.results = results;
    }

    @Override
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        for (TridentTuple tuple : tuples) {
            results.add((AggregatedFoo) tuple.getValue(0));
        }
    }
}

Итак, теперь код будет выглядеть так:

// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    .partitionPersist(new FieldFetcherFactory(),
                        new Fields("aggregated_foos"),
                        new FieldFetcherStateUpdater(results));

     LOGGER.info("Done. Checkpoint results={}", results);

Но ничего... В логах Done. Checkpoint results=[] (пустой список)

Есть ли способ получить это? Я думаю, что это должно быть выполнимо, но я не смог найти способ...

Любой намек или ссылка на страницы или что-либо подобное будут оценены. Заранее спасибо.


person BorrajaX    schedule 20.01.2016    source источник


Ответы (1)


Вам нужно использовать статическую переменную-член result. Если у вас запущено несколько параллельных задач (например, parallelism_hint > 1), вам также необходимо synchronize предоставить доступ на запись к result.

В вашем случае result будет пустым, потому что внутри Storm создается новый экземпляр вашего болта (включая новый экземпляр ArrayList). Использование статической переменной гарантирует, что вы получите доступ к правильному объекту (так как он будет только один для всех экземпляров вашего болта).

person Matthias J. Sax    schedule 20.01.2016
comment
Оно работает! По какой-то причине я чувствую себя таким грязным, делая это... но это работает!! Ура! Спасибо - person BorrajaX; 20.01.2016