Я пытаюсь создать несколько модульных тестов, чтобы убедиться, что определенные части моей топологии Trident делают то, что должны.
Я хотел бы получить все значения, полученные после запуска топологии, и поместить их в список, чтобы я мог "видеть" их и проверять условия для них.
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
Я запускаю топологию как TrackedTopology
(получил код из другой вопрос SO, спасибо @brianghig за вопрос и @Thomas Kielbus за ответ)
Вот как я «запускаю» топологию и как я ввожу в нее образцы значений:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
Когда я делаю это, я вижу в сообщениях журнала, что топология работает правильно и что значения вычисляются правильно, но я хотел бы «выловить» результаты в List
(или любую другую структуру, на данный момент ), так что я действительно могу поставить некоторые Asserts
в свои тесты.
Я пробовал [чертову тонну] разных подходов, но ни один из них не работает.
Последней идеей было добавить болт после агрегации, чтобы он "сохранял" мои значения в списке:
Ниже вы увидите класс, который пытается просмотреть все кортежи, созданные aggregate
, и поместить их в список, который я ранее инициализировал:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
Итак, теперь код будет выглядеть так:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
Но ничего... В логах Done. Checkpoint results=[]
(пустой список)
Есть ли способ получить это? Я думаю, что это должно быть выполнимо, но я не смог найти способ...
Любой намек или ссылка на страницы или что-либо подобное будут оценены. Заранее спасибо.