Вызовы веб-служб с разбивкой на страницы Reactor

Я использую реактор в весеннем проекте, где мне нужно вызвать API с разбивкой на страницы. API возвращает что-то вроде этого:

{
  "last": false,
  "totalPages": 2,
  "totalElements": 4178,
  "sort": {
    "sorted": false,
    "unsorted": true
  },
  "first": false,
  "numberOfElements": 1178,
  "size": 3000,
  "number": 0
}

Теперь я пытаюсь добиться использования webflux для вызова сервера до последнего == true.

Я не могу понять, что было бы правильным способом сделать это.

То, что у меня есть до сих пор, это:

Mono<UserInfo> firstUserInfo =  panelistService.getInactiveUserInfo(noOfDays, role, pageNo);


    Flux<User> listOfUsers = firstUserInfo.flatMap(fui ->{

        logger.info("ACCOUNT SERVICE - purgeCronJob - Getting first page of inactive panelists -  page {} total {} last {} panelists {}", pageNo,fui.getTotalNoOfPages(),fui.isLast(),fui.getUserContent().size()); 

        Mono<List<User>> firstListOfUsers = Mono.just(fui.getUserContent());

        if(fui.isLast()) {
            return firstListOfUsers;
        }

        pageNo++;
        int totalPageNo = fui.getTotalNoOfPages();

        for(int i = pageNo; i < totalPageNo; i++) {

            Mono<List<User>> lou = panelistService.getInactiveUserInfo(noOfDays, role, i).map(ui ->{
                logger.info("ACCOUNT SERVICE - purgeCronJob - Getting inactive panelists -  page {} total {} last {} panelists {}", pageNo,ui.getTotalNoOfPages(),ui.isLast(),ui.getUserContent().size()); 
                return ui.getUserContent();
            }); 
            firstListOfUsers.zipWith(lou);
        }

        return firstListOfUsers;

    }).flatMapMany(Flux::fromIterable);

    listOfUsers.subscribe();

Поэтому вместо того, чтобы получать последнее значение, я создаю все моно для каждой страницы и сжимаю их все вместе.


person soipo    schedule 27.02.2019    source источник


Ответы (2)


Я нашел способ, и я надеюсь, что это поможет другим. Я создал метод, который получает неактивные учетные записи и рекурсивно вызывает этот метод. Разница в том, что теперь я возвращаю непосредственно поток пользователей и что я Flux.merge рекурсивные вызовы.

Flux<User> getInactiveUserByPage(Integer noOfDays, String role, Integer pageNo){

    return  panelistService.getInactiveUserInfo(noOfDays, role, pageNo).flatMapMany(ui ->{

        logger.info("ACCOUNT SERVICE - getInactiveUserByPage - Getting inactive panelists -  page {} total {} last {} panelists {}", pageNo,ui.getTotalNoOfPages(),ui.isLast(),ui.getUserContent().size()); 

        Flux<User> users = Flux.fromIterable(ui.getUserContent());

        if(ui.isLast()) {
            return users;
        }

        Integer newPageNo  = pageNo+1;

        Flux<User> next =  getInactiveUserByPage(noOfDays, role, newPageNo).subscribeOn(Schedulers.elastic()).mergeWith(users);

        return next;
    });

}
person soipo    schedule 28.02.2019

Вы можете создать поток в зависимости от количества страниц totalPages, установить noOfDays и роль где-то снаружи и получить totalSize из первого запроса:

Flux.range(0,totalSize)
   .map(pageNo -> panelistService.getInactiveUserInfo(noOfDays, role, pageNo)...
person Frischling    schedule 01.03.2019
comment
Как бы вы увеличили номер страницы, так как он должен увеличиваться при каждом вызове, возможно, атомарное целое число, а на карте увеличить значение? - person soipo; 04.03.2019
comment
Flux.range испускает приращения, и вы можете использовать их, как показано. Я правильно понял ваш вопрос? - person Frischling; 05.03.2019