Конкатенация потока не возвращается

Я экспериментирую с репозиториями Spring Boot 2.0, webflux и reactiv Mongo. У меня есть два варианта: сначала удалить, а затем добавить данные в коллекцию. В первом варианте поток блокируется до завершения удаления, во втором варианте добавление данных конкатенируется с удалением.

Вариант А

@GetMapping("init")
public String init() {
    Random rand = new Random();
    Flux<Power> powers = Flux.range(0, 10000)
            .map(i -> new Power(i,
                    LocalDateTime.now().toEpochSecond(ZoneOffset.of("+1")),
                    rand.nextDouble()));
    powerRepository.deleteAll().block();
    powerRepository.save(powers).blockLast();
    return "ok";
}

Вариант Б

@GetMapping("init")
public String init() {
    Random rand = new Random();
    Flux<Power> powers = Flux.range(0, 10000)
            .map(i -> new Power(i,
                    LocalDateTime.now().toEpochSecond(ZoneOffset.of("+1")),
                    rand.nextDouble()));
    powerRepository.deleteAll()
            .concatWith((v) -> powerRepository.save(powers)).blockLast();
    return "ok";
}

Вариант А возвращается, вариант Б нет. В чем разница? Как правильно объединить две операции с репозиторием?


person Gregor    schedule 28.02.2017    source источник
comment
В основном то, что написал Грег. Разница связана с выдачей значений: deleteAll() завершается без выдачи значения и объединения результата во время выполнения deleteAll. Concat выполняется во время выполнения родительского издателя, что может привести к нежелательному поведению. Как правило, возвращайте реактивный тип (Mono, Flux) вместо разрешенного типа (String), чтобы сохранить реактивное поведение. В противном случае вы все равно блокируете вызывающий поток. Spring WebFlux обрабатывает выполнение за вас.   -  person mp911de    schedule 01.03.2017


Ответы (1)


Цепочка с использованием .then вызывает, если нет ничего лучше. Избегайте блочных вызовов и вместо этого возвращайте Mono.just("ok").

public Mono<String> init() {
    return repo.deleteAll()
        .then(() -> repo.save(...))
        .then(() -> Mono.just("ok"));
}

Заставьте конечную точку возвращать Mono.

person gregturn    schedule 01.03.2017
comment
Если вам нужно сохранить список сущностей, верните Flux‹String› в качестве типа ответа и используйте .thenMany(() -> repo.save(powers)).thenMany(() -> Flux.just(ok)) - person Orest; 01.03.2017