Java Spring Async изпълнение

Използвайки jhipster при пролетно зареждане 1.5.4, трудно ми е да накарам фоновите задачи да се изпълняват асинхронно; те изглежда се изпълняват синхронно, използвайки различен taskExecutor и набор от нишки от този, който съм конфигурирал.

Всичко това се случва в услуга, която за яснота е дефинирана така:

@Service
@Transactional
public class AppService {
    @Scheduled(fixedDelay = 3000)
    public void consumeData() {
        // connect to a subscriber and push data to the workerBee
        for(Tuple data : this.getTuples()) {
            workerBee(data);
        }
    }

    @Timed
    @Async
    public void workerBee(Tuple data) throws Exception {
        // ... do something that takes 300ms ....
        Thread.sleep(300);
    }
}

Може да се каже, че услугата не е идеалното място за тази работа, но за демонстрационни цели е подходящо.

(също като настрана, изглежда, че @Timed не работи, но прочетох някъде, че @Timed не работи, когато се извика вътрешно в рамките на услугата)

Съответен раздел от application.yml:

jhipster:
    async:
        core-pool-size: 8
        max-pool-size: 64
        queue-capacity: 10000

Използвайки по подразбиране, генериран AsyncConfiguration.java, който изглежда така:

@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
    log.debug("Creating Async Task Executor");
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
    executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
    executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
    executor.setThreadNamePrefix("app-Executor-");
    return new ExceptionHandlingAsyncTaskExecutor(executor);
}

Проверих, че компонентът taskExecutor се създава и се използва от liquibase.

Когато свържа visualvm, виждам цялата работа, която се случва в pool-2-thread-1, което трябва да е някакъв вид по подразбиране и е очевидно, че работата се извършва синхронно, а не асинхронно.

Неща, които съм пробвал:

  • Посочване на изпълнителя в анотацията @Async като @Async("taskExecutor")
  • Проверка на конфигурацията на taskExecutor с 8 нишки в основния размер на пула.
  • Проверка дали приложението има анотацията @EnableAsync (тя има по подразбиране).

person W. Smith    schedule 22.08.2017    source източник


Отговори (3)


Една алтернатива е промяната на @Bean getAsyncExecutor на това:

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor () {
    log.debug("Creating Async Task Executor");
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
    executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
    executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
    executor.setThreadNamePrefix("app-Executor-");
    return executor;
}
person Daniel C.    schedule 22.08.2017
comment
Не виждам какво сте направили тук, освен да премахнете обвивката ExceptionHandlingAsyncTaskExecutor около изпълнителя. Защо би/трябва това да помогне? - person W. Smith; 22.08.2017
comment
Игнорирайте отговора, просто разбрах, че използвате jhipster - person Daniel C.; 22.08.2017

Изглежда, че не спазвам правилата, изложени тук: http://www.baeldung.com/spring-async. Най-вече самоизвикване:

@Async има две ограничения:

it must be applied to public methods only
self-invocation – calling the async method from within the same class – won’t work
person W. Smith    schedule 23.08.2017

Алтернатива 2: извикайте асинхронния метод в същия клас с помощта на CompletableFuture

Ето друг подход, ако трябва да използвате асинхронен метод и да го извикате в същия клас, като използвате CompletableFuture и инжектирате Executor, генериран от AsyncConfiguration

@Service
public class MyAsyncProcess {

    private final Logger log = LoggerFactory.getLogger(MyAsyncProcess.class);

    @Autowired
    Executor executor;


    @Scheduled(cron = "*/8 * * * * *")
    public void consumeData() {

        IntStream.range(0,20).forEach( (s) ->
            CompletableFuture.supplyAsync( () -> { return workerBeeCompletableFuture(String.valueOf(s)); } , executor));
    }


    public  CompletableFuture<String> workerBeeCompletableFuture(String data)  {

        log.debug("workerBeeCompletableFuture: Iteration number: " + data + " Thread: " + Thread.currentThread().getName());

        try { Thread.sleep(2000); }
        catch (InterruptedException e) { e.printStackTrace(); }

        return CompletableFuture.completedFuture("finished");

    }

Алтернатива 1 с използване на @Async

Най-накрая разбирам какво причинява това поведение, всъщност @Scheduler извиква workerBee като локален метод, а не като @Async метод.

За да накарате @Async да работи, просто създайте нов @Service за @Schedule, наречен MySchedulerService и @Autowired AppService в MySchedulerService. Също така премахнете @Schedule от класа AppService.

Трябва да е така:

@Service
public class MyAsyncProcess {

    private final Logger log = LoggerFactory.getLogger(MyAsyncProcess.class);

    @Async
    public  void workerBeeAsync(String data)  {
        // ... do something that takes 300ms ....
        try {
            log.debug("Iteration number: " + data + " Thread: " + Thread.currentThread().getName());

            Thread.sleep(2000);
            log.debug("finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

И услугата за график

@Service
public class MySchedule {

    @Autowired
    MyAsyncProcess myAsyncProcess;

    @Scheduled(cron = "*/8 * * * * *")
    public void consumeData() {

        IntStream.range(0,20).forEach(s ->
            myAsyncProcess.workerBeeAsync(String.valueOf(s)));

    }

}

В application.yml използвам следните стойности:

jhipster:
    async:
        core-pool-size: 50
        max-pool-size: 100
        queue-capacity: 10000

@Async автоматично ще открие Executor, конфигуриран в AsyncConfigurer клас.

Дано ти помогне.

person Daniel C.    schedule 23.08.2017