Я обнаружил потерю данных в Esper (v.7.1.0) в случае, если включен пул входящих потоков. Вот простой пример, демонстрирующий это странное поведение:
Configuration config = new Configuration();
// set up concurrent processing
config.getEngineDefaults().getThreading().setThreadPoolInbound(true);
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
// simple schema
epService.getEPAdministrator().createEPL("create objectarray schema LogLine as (account_name string, value int) ");
// event for terminating context partition
epService.getEPAdministrator().createEPL("create schema TerminateEvent() ");
// Allocates context partition for each account_name. Start it on LogLine event and terminate on TerminateEvent.
epService.getEPAdministrator()
.createEPL("create context NestedCtx " +
"context InitCtx start LogLine end TerminateEvent ," +
"context AccountCtx partition by account_name from LogLine");
// select to collect count of events per account_name.
EPStatement statement = epService.getEPAdministrator().createEPL(" context NestedCtx select account_name, count(*) as total from LogLine output last when terminated ");
// attach listener for printing results
statement.addListener(new UpdateListener() {
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
for (EventBean eventBean : newEvents) {
String properties = Arrays.stream(eventBean.getEventType().getPropertyNames()).map((prop) -> {
return prop + " " + eventBean.get(prop);
}).collect(Collectors.joining("; "));
System.out.println(properties);
}
}
});
//send 3 LogLine events
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
// send terminate event in order to get results
epService.getEPRuntime().sendEvent(Collections.emptyMap(), "TerminateEvent");
System.out.println("finish");
Проблема в том, что UpdateListener не вызывается, когда включена параллельная обработка. Результат печатается только тогда, когда я отключаю пул входящих потоков. В чем причина такого поведения?