JMS использует несколько тем

Я новичок в Java и работаю над проектом, который использует несколько (разных) тем и отправляет их на другой сервер. Мне было интересно, как лучше всего работать с несколькими темами.

Насколько я понимаю, каждый потребитель привязан к одной теме, поэтому, если бы мне пришлось использовать несколько тем, мне понадобился бы один потребитель для каждой отдельной темы. Поскольку потребитель делает блокирующий вызов, мне нужно будет вызвать поток для каждого потребителя, чтобы параллельно использовать эти темы.

И если я хочу еще больше повысить пропускную способность, будет ли хорошей практикой иметь один поток-босс для каждого потребителя (прикрепленный к теме) и разрешить каждому потоку-боссу настраивать рабочие потоки для соответствующего повышения производительности?

Пожалуйста, посоветуйте, является ли это хорошей практикой, а если нет, то каковы другие альтернативные варианты? И есть ли какие-либо известные шаблоны проектирования для решения этой проблемы?

Почему я выбрал модель потребителя вместо модели слушателя?

У меня есть еще одно ограничение: после того, как потребитель получит сообщение, ему необходимо отправить сообщение на другой принимающий сервер. Если принимающий сервер не работает (во время отправки новой версии), я должен приостановить прием сообщений до тех пор, пока принимающий сервер не включится. В этом случае наличие прослушивателя сообщений не поможет, потому что я не смогу приостановить прослушиватель, когда принимающий сервер не работает. Я прав, когда говорю это, или есть способ приостановить слушателя и прекратить потребление сообщений, пока принимающий сервер не включится?


person mariner    schedule 01.03.2016    source источник


Ответы (1)


Я бы сделал это, используя функцию прослушивания.

Ваш объект реализует интерфейс MessageListener, а затем вы добавляете к потребителю свой прослушиватель сообщений. В этом случае клиентская библиотека будет обрабатывать потоки для вас при чтении сообщений из очереди и отправке их слушателям.

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageConsumer implements MessageListener {

    public static void main() {
        try {

            MyMessageConsumer myMessageConsumer = new MyMessageConsumer();

            // This example is using the ActiveMQ client library
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination1 = session.createQueue("MyTopic1");
            MessageConsumer consumer1 = session.createConsumer(destination1);
            consumer1.setMessageListener(myMessageConsumer);

            Destination destination2 = session.createQueue("MyTopic2");
            MessageConsumer consumer2 = session.createConsumer(destination2);
            consumer2.setMessageListener(myMessageConsumer);

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        // Handle my messages here
    }
}

Транзакции сеанса

В этом варианте мы используем транзакционные сообщения, и оно доставляет сообщение, если вызывается session.rollback(). Вы подтверждаете(), когда ваша операция была успешной, или выполняете откат(), когда это не так.

пакет io.bessel.test;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageConsumer implements MessageListener {

    public static void main(String ... arguments) {
        try {
            // This example is using the ActiveMQ client library
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

            MyMessageConsumer myMessageConsumer = new MyMessageConsumer(session);

            Destination destination1 = session.createQueue("MyTopic1");
            MessageConsumer consumer1 = session.createConsumer(destination1);
            consumer1.setMessageListener(myMessageConsumer);

            Destination destination2 = session.createQueue("MyTopic2");
            MessageConsumer consumer2 = session.createConsumer(destination2);
            consumer2.setMessageListener(myMessageConsumer);

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    private final Session session;

    public MyMessageConsumer(Session session) {
        this.session = session;
    }

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                String text = ((TextMessage) message).getText();
                System.out.println(String.format("Received message: %s", text));
                this.session.rollback();

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}
person mookins    schedule 01.03.2016
comment
Я думал об использовании слушателя, я не упомянул об этом. У меня есть еще одно ограничение: после того, как потребитель получит сообщение, ему необходимо отправить сообщение на другой принимающий сервер. Если принимающий сервер не работает (во время отправки новой версии), я должен приостановить прием сообщений до тех пор, пока принимающий сервер не включится. В этом случае наличие прослушивателя сообщений не поможет, потому что я не смогу приостановить прослушиватель, когда принимающий сервер не работает. Я прав, когда говорю это, или есть способ приостановить слушателя и прекратить потребление сообщений, пока принимающий сервер не включится? Спасибо. - person mariner; 01.03.2016
comment
Добавлен второй пример кода для транзакционных сообщений. Будет повторно доставлять сообщение снова и снова, пока оно не будет подтверждено. Это где-то вводит накладные расходы (как и все), просто не уверен, где и будет ли это проблемой в вашем приложении, но это наиболее прямолинейно. - person mookins; 01.03.2016