Отсутствующие сообщения от TopicProcessor в Reactor 3

Я запускаю простой тест, в котором я публикую сообщения в TopicProcessor из 4 потоков, а в подписчике просто добавляю их в коллекцию. Код следующий:

@Test
public void testProcessingMessages() throws Exception {
    int numberOfMessages = 1000;

    TopicProcessor<Integer> processor = TopicProcessor.create();

    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Queue<Integer> messages = new ConcurrentLinkedQueue<>();

    processor.subscribe(messages::add);

    AtomicInteger counter = new AtomicInteger(0);
    for (int i = 0; i < numberOfMessages; i++) {
        executorService.submit(() -> {
            processor.onNext(counter.incrementAndGet());
        });
    }

    Thread.sleep(10000);

    assertEquals(numberOfMessages, messages.size());
}

Но утверждение в конце концов терпит неудачу, обычно около 980-990 фактических сообщений вместо ожидаемых 1000. Я что-то упустил?


person FVlad    schedule 13.04.2017    source источник


Ответы (1)


Проблема заключалась в том, что TopicProcessor.create создает процессор, который ожидает публикации из одного потока. TopicProcessor.share следует использовать при создании из нескольких потоков.

person FVlad    schedule 13.04.2017