Есть ли какая-либо очередь блокировки Java, которая может сохранять данные на жесткий диск при достижении лимита

Я знаю, что могу использовать JMS и ActiveMQ, но мне действительно нужно что-то очень простое и без больших накладных расходов. Я провел несколько тестов с ActiveMQ, и мне не очень понравилась производительность очередей сохранения.

То, что я ищу, - это базовая реализация любой очереди блокировки с возможностью сохранения сообщения на жестком диске (в идеале), если достигнут некоторый предел размера. Затем он должен иметь возможность прочитать сохраненное сообщение с жесткого диска и, если возможно, прекратить запись нового на жесткий диск (восстановить в памяти).

Мой сценарий очень прост - сообщения (json) приходят из внешнего мира. Я делаю некоторую обработку, а затем отправляю их в другую службу REST. Проблема может возникнуть, когда целевая служба REST не работает или сеть между нами плохая. В этом случае готовые к работе события сохраняются в очереди, которая потенциально может заполнить всю доступную память. Я не хочу/нужно записывать каждое сообщение на HDD/DB - только те, которые не помещаются в память.

Благодарю вас!


person Alex    schedule 06.03.2013    source источник
comment
То, о чем вы просите, не something very simple. Вы, вероятно, хотите something reliable.   -  person Alexander Pogrebnyak    schedule 06.03.2013
comment
ehcache — это самый простой известный мне способ прозрачно перемещать данные на диск и с него. Если порядок в очереди важен, вам нужно будет справиться с этим самостоятельно.   -  person Affe    schedule 06.03.2013
comment
Да, порядок в очереди важен. Кроме того, когда я сказал что-то очень простое, я имел в виду, что мне не нужны кластерные корпоративные решения (потому что я могу использовать ActiveMQ). Вся магия должна происходить внутри 1 JVM. Дополнительная полезная функция - если JVM остановлена ​​- заполнять очередь с жесткого диска, если есть какие-либо сообщения.   -  person Alex    schedule 06.03.2013
comment
Это не просто, но и не ракетостроение. Работа на один-два дня. Просто тщательно спланируйте это на краях очереди.   -  person Hot Licks    schedule 06.03.2013
comment
Я знаю. Вот почему я уже начал его кодировать. Но похоже, что это общая проблема, и я считаю, что должно быть решение. Возможно, не как отдельная библиотека, а как часть более крупного проекта с открытым исходным кодом.   -  person Alex    schedule 07.03.2013
comment
Как только вы начинаете делать что-то с диском, вам, как правило, приходится обдумывать множество вопросов, например допустима ли потеря сообщений (транзакционность) и как масштабировать по горизонтали. Также ваша проблема с сообщениями, которые не помещаются в память, предполагает, что вы говорите об очень огромных объемах данных. Впрочем, вопрос, как уже было сказано, совсем не простой. По крайней мере, для производственного использования. Общее решение подобных проблем заключается в использовании брокера сообщений, такого как ActiveMQ, но есть и другие. Вы можете проверить Apache QPID. Возможно, вам придется настроить этих зверей, чтобы получить от них максимальную отдачу.   -  person Petter Nordlander    schedule 08.03.2013
comment
На самом деле я нашел библиотеку, которая могла бы удовлетворить мои требования: github.com/bulldog2011/bigqueue (BigQueue). Это большая, быстрая и постоянная очередь, основанная на отображенном в память файле. Собираюсь попробовать.   -  person Alex    schedule 09.03.2013


Ответы (2)


Этот код должен работать для вас - это постоянная очередь блокировки в памяти - требуется некоторая настройка файла, но он должен работать

    package test;

     import java.io.BufferedReader;
     import java.io.BufferedWriter;
     import java.io.File;
     import java.io.FileReader;
     import java.io.FileWriter;
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.LinkedList;
     import java.util.List;

     public class BlockingQueue {

    //private static Long maxInMenorySize = 1L;
    private static Long minFlushSize = 3L;

    private static String baseDirectory = "/test/code/cache/";
    private static String fileNameFormat = "Table-";

    private static String  currentWriteFile = "";

    private static List<Object>  currentQueue = new LinkedList<Object>();
    private static List<Object>  lastQueue = new LinkedList<Object>();

    static{
        try {
            load();
        } catch (IOException e) {
            System.out.println("Unable To Load");
            e.printStackTrace();
        }
    }

    private static void load() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()==0){
            //currentQueue = lastQueue = new ArrayList<Object>();
            currentWriteFile = baseDirectory + "Table-1";
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString()+ "\n");
                lastQueue.remove(0);
            }
            writer.close();
        }else{
            if(fileList.size()>0){
                    BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                    String line=null;
                    while ((line=reader.readLine())!=null){
                        currentQueue.add(line);
                    }
                    reader.close();
                    File toDelete = new File(fileList.get(0));
                    toDelete.delete();
            }

            if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1)));
                currentWriteFile = fileList.get(fileList.size()-1);
                String line=null;
                while ((line=reader.readLine())!=null){
                    lastQueue.add(line);
                }
                reader.close();
                //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9));
            }
        }

    }

    private void loadFirst() throws IOException{
        File baseLocation = new File(baseDirectory);
        List<String> fileList = new ArrayList<String>();

        for(File entry : baseLocation.listFiles()){
            if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){
                fileList.add(entry.getAbsolutePath());
            }
        }

        Collections.sort(fileList);

        if(fileList.size()>0){
                BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0)));
                String line=null;
                while ((line=reader.readLine())!=null){
                    currentQueue.add(line);
                }
                reader.close();
                File toDelete = new File(fileList.get(0));
                toDelete.delete();
        }
    }

    public Object pop(){
        if(currentQueue.size()>0)
            return  currentQueue.remove(0);

        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(currentQueue.size()>0)
            return  currentQueue.remove(0);
        else
            return null;
    }

    public synchronized Object waitTillPop() throws InterruptedException{
        if(currentQueue.size()==0){
            try {
                loadFirst();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            if(currentQueue.size()==0)
                wait();
        }
        return currentQueue.remove(0);
    }

    public synchronized void push(Object data) throws IOException{
        lastQueue.add(data);
        this.notifyAll();
        if(lastQueue.size()>=minFlushSize){
            BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile));
            while (!lastQueue.isEmpty()){
                writer.write(lastQueue.get(0).toString() + "\n");
                lastQueue.remove(0);
            }
            writer.close();

            currentWriteFile  = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
                    (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1);
        }
    }

    public static void main(String[] args) {
        try {
            BlockingQueue bq = new BlockingQueue();

            for(int i =0 ; i<=8 ; i++){
                bq.push(""+i);
            }

            System.out.println(bq.pop());
            System.out.println(bq.pop());
            System.out.println(bq.pop());

            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());
            System.out.println(bq.waitTillPop());



        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}
person Naveen Reddy Alka    schedule 19.01.2016
comment
Хорошая попытка, так что не буду голосовать против, но я бы ни за что не стал доверять этому в производственной среде, поскольку JVM может выйти из строя до того, как все будет сохранено на диск. - person user924272; 03.10.2017

Итак, сохранение вашей очереди на диск будет работать, если вы поддержите свою очередь с помощью RandomAccessFile, MemoryMappedFile или MappedByteBuffer... или какой-либо другой эквивалентной реализации. В случае сбоя или преждевременного завершения работы JVM вы можете в значительной степени полагаться на то, что ваша операционная система сохранит незафиксированные буферы на диске. Предостережение заключается в том, что если ваша машина выйдет из строя заранее, вы можете попрощаться с любыми обновлениями в своей очереди, поэтому убедитесь, что вы понимаете это. Вы можете синхронизировать свой диск для гарантированного сохранения, хотя и с сильным ударом по производительности. С более жесткой точки зрения, другой вариант — репликация на другой компьютер для резервирования, что требует отдельного ответа, учитывая его сложность.

person user924272    schedule 02.10.2017