Получение лучшей производительности с ActiveMQ, чем с RabbitMQ

Мне интересно, необычно ли достигать более высокой пропускной способности необработанных сообщений (как для публикации, так и для потребления) с помощью ActiveMQ, а не RabbitMQ? Я спрашиваю, потому что КАЖДЫЙ другой онлайн-справочник, с которым я сталкивался, хвастается, что RabbitMQ работает быстрее.

Я не тестирую законные инструменты для тестирования; вместо этого я изменил базовые примеры издателя/потребителя для обоих, чтобы протестировать 100 000 сообщений с телом сообщения размером 3 килобайта. Обратите внимание, что я тестирую как публикацию, так и потребление в двух разных экземплярах Amazon EC2 x-large. Может я не правильно настроил свой код? Пожалуйста, смотрите мои результаты и код ниже.

ActiveMQ Send 3kb   
Average Time per Message (ns):  497276.1179
Average # Messages per second:  2010.935101
Total Time (s):                 49.72810906

ActiveMQ Recv 3kb   
Average Time per Message (ns):  43813.35476
Average # Messages per second:  22823.86285
Total Time (s):                 4.381379289

RabbitMQ Send 3kb   
Average Time per Message (ns):  1041524.626
Average # Messages per second:  960.1309229
Total Time (s):                 104.1524626

RabbitMQ Recv 3kb   
Average Time per Message (ns):  612559.3732
Average # Messages per second:  1632.494814
Total Time (s):                 61.25593732

Обновленные числа после удаления queueDeclare() в RabbitMQ Send.java и Recv.java:

Это значительно улучшило время RabbitMQ, но что-то должно быть не так, поскольку время потребления ActiveMQ составляет всего 4 секунды...

ActiveMQ Send 3kb   
Average Time per Message (ns):  491404.5666
Average # Messages per second:  2034.983124
Total Time (s):                 49.14045666

ActiveMQ Recv 3kb   
Average Time per Message (ns):  41976.17158
Average # Messages per second:  23823.03965
Total Time (s):                 4.197617158

RabbitMQ Send 3kb   
Average Time per Message (ns):  354795.8818
Average # Messages per second:  2818.522005
Total Time (s):                 35.47958818

RabbitMQ Recv 3kb   
Average Time per Message (ns):  440349.3892
Average # Messages per second:  2270.924009
Total Time (s):                 44.03493892

ActiveMQ Send.java

public class Send implements Runnable {

private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

public static void main(String[] argv) throws java.io.IOException {
    (new Thread(new Send())).start();
}

public void run() {
    try {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageProducer from the Session to the Topic or Queue
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
            startTime = System.nanoTime();

            // 3kb
            String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."

            TextMessage message = session.createTextMessage(text);

// Tell the producer to send the message
            //System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;
            System.out.println(i + "," + Long.toString(stopTime-startTime));

        }

        // Clean up
        session.close();
        connection.close();

        //System.out.println("");
        //System.out.println("Total Time: " + totalTime + "ns");
        //System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
        //System.out.println("");

    }
    catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}
}

ActiveMQ Recv.java

public class Recv implements Runnable {

private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv)
    throws java.io.IOException {

    (new Thread(new Recv())).start();

}

public void run() {
    try {

        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageConsumer from the Session to the Topic or Queue
        MessageConsumer consumer = session.createConsumer(destination);

        // Message Listener
        MyListener listener = new MyListener();
        consumer.setMessageListener(listener);

        // Wait for a message
        //Message message = consumer.receive(1000);

       // consumer.close();
       // session.close();

// connection.close();
    } catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}

public class MyListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            startTime = System.nanoTime();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                stopTime = System.nanoTime();
                totalTime = totalTime + stopTime-startTime;

                System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

                numMessages++;

            } else {
                System.out.println("Received: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
}

RabbitMQ Send.java

public class Send implements Runnable {

private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

// 3kb
private static final String message = "AMFu8UlKW2zJB..."

public static void main(String[] argv)
 throws java.io.IOException {

 (new Thread(new Send())).start();

}

public void run() {

try {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
         startTime = System.nanoTime();

         // No Persistence
         // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

         stopTime = System.nanoTime();
         totalTime = totalTime + stopTime-startTime;
         System.out.println(i + "," + Long.toString(stopTime-startTime));
     }

     channel.close();
     connection.close();

 } catch (Exception e) {
     e.printStackTrace();
 }
}
}

RabbitMQ Recv.java

private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv) {
    (new Thread(new Recv())).start();
}

public void run(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // No Persistence
        // channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        while (true) {
            startTime = System.nanoTime();
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;

            System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

            numMessages++;

        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

person littleK    schedule 02.10.2012    source источник
comment
Почему у вас нет импорта...   -  person MrMesees    schedule 26.01.2021


Ответы (1)


Ну, я посмотрел ваш код и отметки вашего бенчмарка, но только на пути Recv. Я видел, что цифры RabbitMq в два раза больше, чем ActiveMq. Потом я увидел исходники обоих, и что-то меня предупредило..

В исходном коде Rabbitqm Recv вы всегда делаете queuDeclare для каждого сообщения, если время связи является текущей большой задержкой, убедитесь, что двойное время из ActiveMq, чем Rabbitmq, происходит отсюда.

person pfreixes    schedule 03.10.2012
comment
Привет pfreixes, это отличное наблюдение. Я закомментирую объявление очереди и снова запущу тесты для уже объявленной очереди. Я обязательно сообщу о результатах... спасибо! - person littleK; 03.10.2012
comment
Я добавил свои новые результаты в исходный пост. Это действительно значительно улучшило время для RabbitMQ, который теперь показывает более быстрое время отправки, чем ActiveMQ. Однако я до сих пор не могу понять, почему время потребления ActiveMQ составляет всего 4 секунды для 100 000 сообщений. Я проверил фактические полученные сообщения, и они содержат правильное содержание. Любые другие предложения? - person littleK; 03.10.2012
comment
Да, у меня есть одно представление о том, что ваш ActiveMQ быстрее, чем один F1 :) ... Я думаю, вам следует изменить то, как вы берете общее время для получения одного сообщения. Тока вы получаете только процессорное время для запуска нескольких кодов операций, сообщение уже в памяти приемника!!! взгляните на свой код. В противном случае лучший способ проверить, какие накладные расходы связаны с одним промежуточным агентом, - это указать время начала в сообщении и получить его после получения из вашего Recv, чтобы оценить истинную стоимость отправки одного сообщения. - person pfreixes; 04.10.2012
comment
Еще одно отличное наблюдение и идея, я попробую это. Спасибо еще раз! - person littleK; 04.10.2012