ThreadPoolExecutor — указание, какой поток обрабатывает данную задачу

Есть ли хороший способ реализовать политику выполнения, которая определяет, какой поток будет обрабатывать данную задачу на основе некоторой схемы идентификации? или это даже хороший подход?

У меня есть требование обработать 1-много файлов, которые я буду получать чередующимися фрагментами. по мере поступления кусков я хочу сделать задачу по обработке этого куска. Загвоздка в том, что я не могу позволить себе роскошь сделать код обработки потокобезопасным, поэтому, как только поток в пуле обработает кусок файла, мне нужен этот же поток для обработки остальной части этого файла. Меня не волнует, обрабатывает ли поток несколько файлов одновременно, но я не могу иметь более одного потока из пула, обрабатывающего один и тот же файл одновременно.

в книге «Java Concurrency in Practice» говорится, что вы можете использовать политики выполнения, чтобы определить, «в каком потоке будет выполняться задача?», но я не понимаю, как это сделать.

Спасибо


person sethro    schedule 12.08.2011    source источник


Ответы (3)


Ну, можно было бы написать свой ThreadPoolExecutor - но в общем-то нет возможности это сделать. Весь смысл пула потоков в том, что вы просто бросаете на него работу, не заботясь о том, какой поток получает какую задачу. Похоже, в этом случае вам нужно будет управлять потоками самостоятельно, сохраняя карту того, какой поток обрабатывает какой файл.

Вы знаете, когда файл был закончен? Если нет, у вас могут возникнуть проблемы с постоянно растущей картой...

person Jon Skeet    schedule 12.08.2011
comment
Вы можете повторно использовать большую часть кода ThreadPoolExecutor и просто добавить фильтр перед execute(), но я согласен, что ваш шаблон на самом деле не соответствует тому, для чего предназначен ThreadPoolExecutor. - person ptyx; 12.08.2011
comment
Решение Джона было бы довольно легко реализовать. Вы по-прежнему должны использовать пулы потоков, но каждый пул с одним потоком (Executors.newSingleThreadExecutor()) связан с одним файлом. Когда приходит блок, вы просто вызываете map.get(fileName(chunk)).execute(new TaskChunk(chunk)). Если вы хотите ограничить количество потоков, вы также можете связать несколько имен файлов с каждым потоком. - person toto2; 12.08.2011
comment
Я знаю, когда я получил последний набор байтов для файла... У меня сложилось впечатление, что отделение отправки задачи от выполнения идеально, и утверждение в книге, которое я цитировал, произвело на меня впечатление, что ThreadPoolExecutor сделал много работа, и мне просто нужно реализовать часть политики; это также дало бы мне больше гибкости, если бы я когда-нибудь смог сделать код обработки потокобезопасным. Первоначально я рассматривал способ, описанный toto, и могу просто использовать этот подход. - person sethro; 12.08.2011
comment
Вы можете отделить политику, поместив карту и пулы потоков в какой-либо другой объект (MyExecutor) и просто вызвав myExecutor.execute(new TaskChunk(chunk));. Затем вам также понадобится метод TaskChunk.getFileName(), чтобы execute() знал, какой поток использовать. - person toto2; 12.08.2011

Хорошей идеей может быть поток для каждого файла:

HashMap<String, MyThreadImplementer> fileToThreadMap...

class MyThreadImplementer implements Runnable {
    int maxNumParts;
    private List<FileChunk> chunkList...
    private List<FileChunk> doneChunks...

    public MyThreadImplementer(int maxNumberOfParts) {
        maxNumParts=maxNumberOfParts;
    }

    public void run() {
        while( doneChunks.size() < maxNumParts ) {
           Thread.sleep(...)
            if ( !chunkList.isEmpty() ) {
                process each chunk in list and mvoe to done chunks
            }
        }
    }
}

Но вам нужно быть осторожным, чтобы не обработать 1000 файлов и тем самым создать 1000 потоков.

person cs94njw    schedule 12.08.2011

Вы говорите, что «не можете позволить себе роскошь сделать код обработки потокобезопасным», но это не означает, что вам нужно сопоставлять файлы с конкретными потоками. Это просто означает, что вы не можете начать обработку следующего фрагмента из файла, пока не завершится обработка последнего фрагмента из этого файла.

Воспользовавшись преимуществами java.util.concurrent, вы можете поддерживать Map<String, LinkedBlockingQueue<FileChunk>> (предполагая, что имя файла является ключом) в основном потоке и назначать каждый фрагмент очереди для соответствующего файла по мере поступления фрагментов. Затем иметь одну блокировку Runnable в каждой очереди.

Таким образом, каждый данный файл будет обрабатываться только одним потоком за раз. И вам не нужно было бы напрямую возиться с потоками или поддерживать несколько пулов потоков.

person John Glassmyer    schedule 15.12.2011