Можно ли использовать один канал для связи с исполнителем одного потока в RabbitMQ?

Я пытаюсь взаимодействовать с сервером RabbitMQ, используя клиентский API RabbitMQ-java. Я прочитал из руководство по API клиента Java:

Как правило, следует избегать совместного использования экземпляров Channel между потоками. Приложения должны предпочесть использование канала для каждого потока, а не совместное использование одного и того же канала несколькими потоками.

Я пытаюсь использовать ThreadPoolExecutor с corePoolSize 1 и добавляю задачи Runnable для сохранения сообщений в очередях RabbitMQ. Вот код, который я использую:

package common;

import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitMQUtil {
    private static Logger log= LoggerFactory.getLogger("logger");
    private static RabbitMQUtil gmInstance;
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
    private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
    private final Properties properties = new Properties();
    private String host = null;
    private int port = 0;
    private String username = null;
    private String password = null;
    private String useSSL = "false";
    private ConnectionFactory factory;
    private Connection connection;
    private Channel channel;

    private RabbitMQUtil() throws IOException, TimeoutException, Exception {
        try {
            InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
            if(stream != null) {
                properties.load(stream);
            }
        } catch (Exception ex) {
            log.error("Exception while loading the rabbitmq properties file:", ex);
        }

        host = properties.getProperty("rabbitmq.host", "localhost");
        port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
        username = properties.getProperty("rabbitmq.username", "guest");
        password = properties.getProperty("rabbitmq.password", "guest");
        useSSL = properties.getProperty("rabbitmq.usessl", "false");

        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        if("true".equalsIgnoreCase(useSSL)) {
            try {
                factory.useSslProtocol();
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                log.error("Exception while applying the tls for rabbitmq:", e);
            }
        }
        connection = factory.newConnection();
        connection.addBlockedListener(new RabbitMQBlockedListener());
        connection.addShutdownListener(new RabbitMQShutDownListener());

        channel = connection.createChannel();
    }

    public static RabbitMQUtil getInstance() {
        if(gmInstance == null) {
            synchronized (RabbitMQUtil.class) {
                if(gmInstance == null) {
                    try {
                        gmInstance = new RabbitMQUtil();
                    } catch (IOException | TimeoutException e) {
                        log.error("Exception in getInstance:", e);
                    } catch (Exception e) {
                        log.error("Exception in getInstance:", e);
                    }
                }
            }
        }
        return gmInstance;
    }

    public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
        log.info("data to be saved for :"+queueName+" is:"+obj.toString());
    }

    public void saveMsgInQueue(JsonObject gson, String queueName) {
        this.executor.execute(new RabbitMQData(gson, queueName));
    }

    private class RabbitMQBlockedListener implements BlockedListener {
        @Override
        public void handleBlocked(String arg0) throws IOException {
            log.warn("blocked listener called:", arg0);
        }

        @Override
        public void handleUnblocked() throws IOException {
            log.warn("unblocked listener called:");
        }
    }

    private class RabbitMQShutDownListener implements ShutdownListener {
        @Override
        public void shutdownCompleted(ShutdownSignalException cause) {
            log.error("Shutdown event listener called:", cause);
            log.error("shutdown event listener:"+cause.isHardError());
        }
    }

    private class RabbitMQData implements Runnable{
        JsonObject obj;
        String queueName;
        public RabbitMQData(JsonObject obj, String queueName) {
            Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
            this.obj = obj;
            this.queueName = queueName;
        }

        @Override
        public void run() {
            try {
                channel.queueDeclare(this.queueName, true, false, false, null);
                channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
            } catch (Exception e) {
                log.info("Error while running the scheduled rabbitmq task:", e);
                log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
            }
        }
    }

    public static void saveRabbitMQData(JsonObject obj, String queueName) {
        RabbitMQUtil util = RabbitMQUtil.getInstance();
        if(util != null) 
            util.saveMsgInQueue(obj, queueName);
        else
            RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
    }
}

Я хотел бы знать следующие вещи:

  1. Можно ли использовать один канал, когда используется пул потоков только из 1 потока?
  2. Как должны обрабатываться объекты соединения и канала, когда заблокированы/разблокированы и заблокированы события? Хотя API автоматически устанавливает соединение, когда сервер RabbitMQ снова работает.

Любые другие отзывы будут оценены.

Спасибо


person Prateek    schedule 15.06.2017    source источник


Ответы (1)


1.- Можно ли использовать один канал, когда используется пул потоков только из 1 потока?

да, все в порядке. именно так вы должны это сделать. только один поток должен использовать экземпляр Channel. В противном случае подтверждения могут быть потеряны (см. здесь: https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html)

2.- Как следует обрабатывать объекты соединения и канала, когда инициируются события блокировки/разблокировки и отключения? Хотя API автоматически устанавливает соединение, когда сервер RabbitMQ снова работает.

когда приложение закрывается, вы должны закрыть канал, а затем закрыть соединение с RabbitMQ.

    channel.close();
    conn.close();

о блокировке/разблокировке читайте здесь (https://www.rabbitmq.com/api-guide.html):

Обратные вызовы для потребителей отправляются в пул потоков отдельно от потока, создавшего экземпляр своего канала. Это означает, что потребители могут безопасно вызывать методы блокировки для подключения или канала, такие как Channel#queueDeclare или Channel#basicCancel.

Каждый канал имеет свой собственный поток отправки. Для наиболее распространенного случая использования одного потребителя на канал это означает, что потребители не задерживают других потребителей. Если у вас есть несколько потребителей на канал, имейте в виду, что долго работающий потребитель может задерживать отправку обратных вызовов другим потребителям на этом канале.

person Jose Zevallos    schedule 15.06.2017
comment
Благодарю за ваш ответ. Мне пришлось использовать connection.abort() и channel.abort() в слушателе выключения. потому что закрытие вызывало исключение, а connection = null и channel = null в одном и том же слушателе. Я создаю новое соединение и канал, если они равны нулю перед отправкой сообщения на сервер rabbitmq. Является ли хорошей практикой использование той же стратегии в заблокированном слушателе? - person Prateek; 17.06.2017