Несколько потоков ждут пакетной операции

У меня есть процесс, который отправляет несколько задач в ExecutorService, скажем, MyTask. Моя задача запрашивает значение из внешней службы, скажем, ExternalService. Я пытаюсь сделать так, чтобы ExternalService обрабатывал запросы в пакетном режиме, скажем, каждые 100 входящих или каждую 1 секунду, и до этого времени задерживать потоки MyTask, пока они не получат ответ:

public class MyTask implements Runnable {

    @Override
    public void run() {
        try {
            // .... code ....


            ExternalData data = externalService.getData(id);

            // ..... code after batch ...

        }catch (Exception e){

        }
    }
}

Мне нужна служба externalService для удержания MyTask до тех пор, пока она не выполнит свою операцию в пакетном режиме (путем накопления 100 запросов или ожидания в течение 1 секунды) и вернет результат задаче. продолжать.

Как лучше всего подойти к этой проблеме?

Спасибо


person tbo    schedule 24.01.2015    source источник


Ответы (1)


Этого можно добиться с помощью CyclicBarrier.

CyclicBarriers полезны в программах, включающих группу потоков фиксированного размера, которые должны время от времени ожидать друг друга.

Создайте барьер, к которому могут получить доступ все задачи. Аргументы — это количество задач, которые вы хотите ожидать при вызове await(), и действие, которое нужно выполнить после ожидания:

CyclicBarrier barrier = new CyclicBarrier(100, action);

Каждая задача должна вызывать await. Это будет ждать, пока в общей сложности 100 задач не достигнут этой точки или до 1 секунды:

barrier.await(1, TimeUnit.SECONDS);

В случае тайм-аута все равно запустите action. action — это исполняемый файл, который вызывает внешнюю службу. Поскольку action выполняется одновременно, вам, вероятно, также нужно заставить свои задачи ждать его завершения и затем получать результаты.


Чтобы получить фактические результаты внешней службы, я бы, вероятно, использовал что-то вроде следующего. В основном сохраните идентификатор в списке и заблокируйте его, пока результат не будет готов.

action.addId(id);
barrier.await(...); // run batch action using all IDs
result = action.getResult(id); // blocks until result is ready
person kapex    schedule 24.01.2015
comment
Это хорошая функция, о которой я не знал, поэтому, если моя внешняя служба является действием, которое я передаю циклическому барьеру, как все потоки получат результат выполнения и продолжат работу? у каждого будет свой идентификатор, еще раз спасибо - person tbo; 25.01.2015
comment
@tbo Действие будет оболочкой вашего сервиса, которая также обрабатывает идентификаторы и ответ. Я добавил пример. Я думаю, что этот подход в основном должен работать, но я уверен, что в нем все еще отсутствуют некоторые вещи, а также требуется тщательная обработка ошибок. - person kapex; 25.01.2015
comment
На данном этапе я думаю, что этот подход действительно близок к тому, что я пытаюсь сделать, я бы предпочел не выполнять мой код при улавливании, поскольку, если достигнут предел времени ожидания, выдается исключение TimeoutException, и действие не выполняется, а другие потоки получают сломало исключение, сказав, что я должен перехватывать исключения и действовать соответственно для достижения моего сценария - person tbo; 25.01.2015