Чтение больших файлов в многопоточном режиме

Я реализую класс, который должен получить большой текстовый файл. Я хочу разбить его на куски, и каждый кусок будет удерживаться другим потоком, который будет подсчитывать частоту каждого символа в этом куске. Я ожидаю, что запуск большего количества потоков повысит производительность, но оказывается, что производительность ухудшается. Вот мой код:

public class Main {

    public static void main(String[] args) 
    throws IOException, InterruptedException, ExecutionException, ParseException  
    {

        // save the current run's start time
        long startTime = System.currentTimeMillis();

        // create options 
        Options options = new Options();
        options.addOption("t", true, "number of threads to be start");

        // variables to hold options 
        int numberOfThreads = 1;

        // parse options
        CommandLineParser parser = new DefaultParser();
        CommandLine cmd;
        cmd = parser.parse(options, args);
        String threadsNumber = cmd.getOptionValue("t");
        numberOfThreads = Integer.parseInt(threadsNumber);

        // read file
        RandomAccessFile raf = new RandomAccessFile(args[0], "r");
        MappedByteBuffer mbb 
            = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());

        ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
        Set<Future<int[]>> set = new HashSet<Future<int[]>>();

        long chunkSize = raf.length() / numberOfThreads;
        byte[] buffer = new byte[(int) chunkSize];

        while(mbb.hasRemaining())
        {
            int remaining = buffer.length;
            if(mbb.remaining() < remaining)
            {
                remaining = mbb.remaining();
            }
            mbb.get(buffer, 0, remaining);
            String content = new String(buffer, "ISO-8859-1");
            @SuppressWarnings("unchecked")
            Callable<int[]> callable = new FrequenciesCounter(content);
            Future<int[]> future = pool.submit(callable);
            set.add(future);

        }

        raf.close();

        // let`s assume we will use extended ASCII characters only
        int alphabet = 256;

        // hold how many times each character is contained in the input file
        int[] frequencies = new int[alphabet];

        // sum the frequencies from each thread
        for(Future<int[]> future: set)
        {
            for(int i = 0; i < alphabet; i++)
            {
                frequencies[i] += future.get()[i];
            }
        }
    }

}

//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable
{
    private int[] frequencies = new int[256];
    private char[] content;

    public FrequenciesCounter(String input)
    {
        content = input.toCharArray();
    }

    public int[] call()
    {
        System.out.println("Thread " + Thread.currentThread().getName() + "start");

        for(int i = 0; i < content.length; i++)
        {
            frequencies[(int)content[i]]++;
        }

        System.out.println("Thread " + Thread.currentThread().getName() + "finished");

        return frequencies;
    }
}

person barni    schedule 24.06.2017    source источник
comment
Ваше оборудование может передавать с диска столько байтов в секунду. Неважно, сколько вы просите прочитать.   -  person Henry    schedule 24.06.2017
comment
Диск не многопоточный. Ваши ожидания неверны.   -  person user207421    schedule 24.06.2017
comment
Итак, если я сохраню каждый фрагмент как отдельный файл, а затем передам каждый файл потоку, должно ли это стать лучше?   -  person barni    schedule 24.06.2017
comment
Нет. Нет, если они все еще находятся на одном диске.   -  person Henry    schedule 24.06.2017
comment
@барни нет. У вас все равно будет один диск. Это, вероятно, ухудшило бы ситуацию.   -  person JB Nizet    schedule 24.06.2017
comment
хм.. и нет возможности сделать это быстрее на одной машине?   -  person barni    schedule 24.06.2017
comment
@barni, только если у вас несколько физических дисков. Однако вы можете прочитать весь файл в одном потоке и обработать его несколькими потоками.   -  person Matej Kormuth    schedule 24.06.2017
comment
В любом случае единственное, что вы используете в многопоточности, — это поиск. Вы все еще загружаете файл в основном потоке, и это будет то, куда уходит время,   -  person user207421    schedule 24.06.2017
comment
Итак, немного теории: если в самом вашем коде нет узких мест, кроме накладных расходов на ожидание сети, жесткого диска или чего-то еще, говорят, что он привязан к вводу-выводу. В этот момент единственный способ заставить его работать быстрее — это улучшить аппаратное обеспечение, подключенное к самой машине. Или начните горизонтальное масштабирование, чтобы использовать больше независимых машин.   -  person Qix - MONICA WAS MISTREATED    schedule 24.06.2017


Ответы (2)


Как указано в комментариях, вы (обычно) не получите лучшей производительности при чтении из нескольких потоков. Вместо этого вам следует обработать фрагменты, которые вы прочитали в нескольких потоках. Обычно обработка выполняет некоторую блокировку, операции ввода-вывода (сохранение в другой файл? сохранение в базу данных? HTTP-вызов?), и ваша производительность улучшится, если вы будете обрабатывать несколько потоков.

Для обработки у вас может быть ExecutorService (с разумным количеством потоков). используйте java.util.concurrent.Executors для получения экземпляра java.util.concurrent.ExecutorService

Имея экземпляр ExecutorService, вы можете отправьте свои фрагменты на обработку. Отправка фрагментов не будет блокироваться. ExecutorService начнет обрабатывать каждый фрагмент в отдельном потоке (детали зависят от конфигурации ExecutorService ). Вы можете отправить экземпляры Runnable или Callable.

Наконец, после того как вы отправите все элементы, вы должны вызвать awaitTermination в вашем ExecutorService. Он будет ждать, пока не будет завершена обработка всех отправленных элементов. После возврата awaitTermination вы должны вызвать shutdownNow(), чтобы прервать обработку (иначе она может зависнуть на неопределенный срок, обрабатывая какую-то мошенническую задачу).

person Bartosz Bilicki    schedule 24.06.2017
comment
И если один поток обработки может не отставать от скорости чтения, многопоточность — бессмысленное усложнение. - person Kevin Krumwiede; 24.06.2017
comment
Он уже читает в одном потоке и обрабатывает в нескольких потоках, и он уже использует ExecutorService. Это, кажется, не отвечает на вопрос. - person Warren Dew; 24.06.2017

Ваша программа почти наверняка ограничена скоростью чтения с диска. Использование нескольких потоков не помогает в этом, поскольку ограничение является аппаратным ограничением скорости передачи информации с диска.

Кроме того, использование как RandomAccessFile, так и последующего буфера, вероятно, приводит к небольшому замедлению, поскольку вы перемещаете данные в память после их чтения, но перед обработкой, а не просто обрабатываете их на месте. Вам лучше не использовать промежуточный буфер.

Вы можете получить небольшое ускорение, читая из файла непосредственно в окончательные буферы и отправляя эти буферы для обработки потоками по мере их заполнения, а не ожидая, пока весь файл будет прочитан перед обработкой. Однако большая часть времени по-прежнему будет использоваться для чтения с диска, поэтому любое ускорение, вероятно, будет минимальным.

person Warren Dew    schedule 24.06.2017