Потеря данных Esper при включении входящей потоковой передачи

Я обнаружил потерю данных в Esper (v.7.1.0) в случае, если включен пул входящих потоков. Вот простой пример, демонстрирующий это странное поведение:

    Configuration config = new Configuration();
    // set up concurrent processing
    config.getEngineDefaults().getThreading().setThreadPoolInbound(true);

    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);

    // simple schema
    epService.getEPAdministrator().createEPL("create objectarray schema LogLine as (account_name string, value int) ");
    // event for terminating context partition
    epService.getEPAdministrator().createEPL("create schema TerminateEvent() ");

    // Allocates context partition for each account_name. Start it on LogLine event and terminate on TerminateEvent.
    epService.getEPAdministrator()
            .createEPL("create context NestedCtx " + 
                       "context InitCtx start LogLine end TerminateEvent ," + 
                       "context AccountCtx partition by account_name from LogLine");
    // select to collect count of events per account_name.
    EPStatement statement = epService.getEPAdministrator().createEPL(" context NestedCtx select account_name, count(*) as total from LogLine output last when terminated ");
    // attach listener for printing results 
    statement.addListener(new UpdateListener() {

        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents) {
            for (EventBean eventBean : newEvents) {
                String properties = Arrays.stream(eventBean.getEventType().getPropertyNames()).map((prop) -> {
                    return prop + " " + eventBean.get(prop);
                }).collect(Collectors.joining("; "));
                System.out.println(properties);
            }

        }
    });
    //send 3 LogLine events
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");
    epService.getEPRuntime().sendEvent(new Object[] { "TEST", 10 }, "LogLine");

    // send terminate event in order to get results
    epService.getEPRuntime().sendEvent(Collections.emptyMap(), "TerminateEvent");
    System.out.println("finish");

Проблема в том, что UpdateListener не вызывается, когда включена параллельная обработка. Результат печатается только тогда, когда я отключаю пул входящих потоков. В чем причина такого поведения?


person Taras    schedule 05.04.2018    source источник


Ответы (1)


Входящие потоки могут изменить порядок обработки событий, поскольку JVM может обрабатывать поставленные в очередь задачи в любом порядке. Поэтому, когда ваш вариант использования требует упорядоченной обработки событий, это означает, что входящая потоковая передача не является правильным выбором. Вместо этого код вашего приложения может выделять свою очередь/потоки и связывать события с потоками, обеспечивая сохранение порядка. Например, как описано в Вопрос о StackOverflow.

person user650839    schedule 05.04.2018
comment
Спасибо за ответ. Но меня не волнует порядок в моем примере. Я отправляю 3 одинаковых события { account_name: TEST, value: 10 }, а затем для получения результатов отправляется событие TerminateEvent. В конце мне нужно получить SELECT account_name, COUNT(*) ... GROUP BY account_name, который равен {account_name:TEST, count:3}. Порядок не важен, я просто хочу, чтобы Эспер обрабатывал все события без потери данных. - person Taras; 06.04.2018
comment
Контекст InitCtx start LogLine end TerminateEvent означает, что анализ начинается, когда приходит LogLine. Однако JVM может просто приостановить поток, предоставляя это событие LogLine, и JVM может продолжить обработку всех других событий и даже может обрабатывать LogLine как последнее событие в худшем случае. кейс. При обработке входящих потоков порядок, в котором обрабатываются события, может быть любым, и вы должны принять наихудший порядок. - person user650839; 06.04.2018
comment
Спасибо еще раз! Не могли бы вы представить реальный пример использования входящей потоковой передачи? - person Taras; 06.04.2018
comment
выберите count(*) из MyEvent (и всех его вариантов) - person user650839; 08.04.2018
comment
Я отключил пул входящих потоков и начал одновременно отправлять события. К сожалению, результаты нестабильны. Вот соответствующий вопрос: stackoverflow.com/questions/49738825/ - person Taras; 09.04.2018