В современных веб-приложениях довольно часто встречаются сценарии, в которых выполнение определенных задач может занять значительное время. Эти задачи часто называют «длительными заданиями», и они могут варьироваться от сложных вычислений, обработки изображений или данных до ресурсоемких операций, таких как создание отчетов. Эти задачи потенциально могут повлиять на скорость отклика приложения.

Таким образом, чтобы обеспечить удобство работы пользователей и быстроту отклика нашего бэкэнда, мы используем асинхронную обработку, введя очередь сообщений.

В этом блоге мы рассмотрим, как сделать наш серверный API Node.js асинхронным с помощью очереди сообщений RabbitMQ, тем самым повысив его производительность и масштабируемость.

Что такое очередь сообщений?

Очередь сообщений (он же Message Broker) — это, по сути, механизм связи, который позволяет различным компонентам распределенной системы обмениваться сообщениями асинхронно, обеспечивая плавную и эффективную обработку задач. Он действует как посредник, позволяя компонентам отправлять и получать сообщения без прямого взаимодействия друг с другом.

Позвольте мне упростить с интересным примером!

Представьте загруженную кухню ресторана, где шеф-повар является основным звеном, отвечающим за приготовление блюд. Однако шеф-повар не может принимать заказы напрямую от клиентов (другие компоненты) во время приготовления. Вместо этого официант (очередь сообщений) действует как посредник. Клиенты (компоненты) размещают свои заказы у официанта, который затем доставляет их шеф-повару в том порядке, в котором они были получены. Таким образом, шеф-повар может сосредоточиться на приготовлении пищи без перерывов, а заказы эффективно обрабатываются в порядке очереди.

Я надеюсь, что этот пример создал бы четкую картину роли очереди сообщений в распределенной системе.

В настоящее время на рынке существует несколько вариантов систем очереди сообщений, каждый из которых имеет свои особенности и сильные стороны. Лидерами высшей лиги являются:

  • Апач Кафка,
  • RabbitMQ,
  • Amazon Simple Queue Service (Amazon SQS),
  • Apache ActiveMQ и
  • Публикация/подписка Redis.

Внедрение долговременного API задания

Теперь давайте посмотрим, как мы можем использовать RabbitMQ для внедрения асинхронной обработки в наш серверный API на основе NodeJS и ExpressJS.

Вы можете посмотреть код проекта на GitHub.

Предпосылки:

Прежде чем мы начнем, убедитесь, что у вас установлено следующее:

  • Node.js: серверная часть будет разработана с использованием Node.js. Вы можете скачать его с официального сайта (https://nodejs.org) и установить в своей системе.
  • RabbitMQ:RabbitMQ — это программное обеспечение для обмена сообщениями с открытым исходным кодом. Загрузите и установите его с официального сайта (https://www.rabbitmq.com/download.html), следуя инструкциям по установке для вашей операционной системы.

Шаг 1. Настройка проекта Node.js

Начнем с создания нового проекта Node.js. Откройте терминал и выполните следующие команды:

mkdir node-rabbitmq-async-backend-api
cd node-rabbitmq-async-backend-api
npm init -y

Шаг 2: Установка зависимостей

Нам понадобятся некоторые библиотеки для работы с RabbitMQ и обработки асинхронных задач. Установите их с помощью npm:

npm install express amqplib
  • express: популярный веб-фреймворк для Node.js, который мы будем использовать для создания нашего API.
  • amqplib: клиент Node.js для RabbitMQ, позволяющий нам взаимодействовать с брокером сообщений.

Шаг 3: Создание Backend API

Теперь давайте создадим наш внутренний API. Создайте файл с именем index.js и вставьте следующий код:

// ========== index.js ==========
const express = require("express");
const { connectToQueue, publishToQueue } = require("./producer");

const app = express();
const port = 3000;
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

app.post("/process-job", async (req, res) => {
  try {
    // Job data as request body
    const jobData = req.body;

    // Connect to the message queue
    const channel = await connectToQueue();
    // Publish a job to the queue
    await publishToQueue(channel, jobData);

    res.status(202).json({ message: "Job added to the queue successfully." });
  } catch (error) {
    res.status(500).json({ error: "Failed to add job to the queue." });
  }
});

app.listen(port, () => {
  console.log(`Backend API listening at http://localhost:${port}`);
});

Шаг 4: Настройка RabbitMQ

Теперь давайте настроим производителя для очереди сообщений с функциями подключения и постановки заданий в очередь для обработки. Создайте файл с именем producer.js и вставьте следующий код:

// ========== producer.js ==========
const amqp = require("amqplib");

const QUEUE_NAME = "job_queue";
const RABBITMQ_URI = "amqp://localhost:5672";

let channel = null;

// Function to connect to the message queue
async function connectToQueue() {
  try {
    // If the channel doesn't exist, create a new one
    if (!channel) {
      const connection = await amqp.connect(RABBITMQ_URI);
      channel = await connection.createChannel();
      await channel.assertQueue(QUEUE_NAME, { durable: true });
    }
    return channel;
  } catch (error) {
    console.error("Error connecting to the message queue:", error);
    throw error;
  }
}

// Function to publish job to the message queue
async function publishToQueue(channel, jobData) {
  try {
    await channel.sendToQueue(
      QUEUE_NAME,
      Buffer.from(JSON.stringify(jobData)),
      {
        persistent: true,
      }
    );
    console.log("Job sent to the queue:", jobData);
  } catch (error) {
    console.error("Error publishing to the queue:", error);
    throw error;
  }
}

module.exports = {
  connectToQueue,
  publishToQueue,
};

Шаг 5: Использование сообщений из очереди

Последним шагом является создание потребителя (часто называемого "работником"), который будет получать сообщения из очереди сообщений и обрабатывать их. Создайте файл с именем consumer.js и добавьте следующий код:

// ========== consumer.js ==========
const amqp = require("amqplib");

const QUEUE_NAME = "job_queue";
const RABBITMQ_URI = "amqp://localhost:5672";

// Function to connect to the message queue
async function connectToQueue() {
  try {
    const connection = await amqp.connect(RABBITMQ_URI);
    const channel = await connection.createChannel();
    await channel.assertQueue(QUEUE_NAME, { durable: true });
    return channel;
  } catch (error) {
    console.error("Error connecting to the message queue:", error);
    throw error;
  }
}

// Function to process the job from the message queue
async function processJob(jobData) {
  // You can implement your logic here to process any long-running job.
  // For example, you can call external APIs, perform database operations, etc.
  // For now, let's simulate a delay to represent the long-running nature of the job.
  console.log("Processing job:", jobData);
  await new Promise((resolve) => setTimeout(resolve, 5000));

  console.log("Job processed successfully!");
}

// Function to start the worker, which will run independently 
// and consume message from the message queue
async function startWorker() {
  try {
    const channel = await connectToQueue();

    channel.consume(QUEUE_NAME, async (msg) => {
      const jobData = JSON.parse(msg.content.toString());
      await processJob(jobData);
      channel.ack(msg);
    });
  } catch (error) {
    console.error("Error starting the worker:", error);
  }
}

startWorker();

Примечание.
Для простоты этого проекта мы создали 3 файла в корневом каталоге проекта, чтобы продемонстрировать работу очереди сообщений. При создании проекта для реальных проектов старайтесь следовать рекомендациям по организации и структурированию проекта в нескольких каталогах и файлах.

Отличная работа! Вы успешно создали асинхронный серверный API с использованием Node.js и RabbitMQ. Теперь ваш API может эффективно обрабатывать длительные задачи, перекладывая их в очередь сообщений, повышая скорость отклика и масштабируемость.

Посмотрим, как это работает в действии

Выполните следующие шаги, чтобы запустить серверный API и рабочий процесс для наблюдения за выходными данными:

Шаг 1: Запустите сервер RabbitMQ

Перед запуском приложения убедитесь, что сервер RabbitMQ запущен и работает. Если вы установили RabbitMQ локально, вы можете запустить его, открыв командную строку RabbitMQ и выполнив следующую команду:

rabbitmq-server

Если у вас установлен «Плагин управления RabbitMQ», вы можете получить доступ к консоли управления на http://localhost:15672/.

Шаг 2: Запустите потребителя

Откройте новое окно терминала и перейдите в каталог вашего проекта. Запустите работника, выполнив следующую команду:

node consumer.js

Рабочий теперь будет активно прослушивать сообщения в файле job_queue. Если изначально сообщений нет, он будет ждать поступления новых задач.

Шаг 3. Запустите Backend API.

Откройте другое окно терминала и перейдите в каталог вашего проекта. Запустите внутренний API, выполнив следующую команду:

node index.js

Серверный API будет доступен по адресу http://localhost:3000.

Шаг 4. Поставьте в очередь длительное задание

Чтобы поставить в очередь длительное задание, используйте cURL или Postman для отправки запроса POST на http://localhost:3000/process-task.

Вы можете запустить команду cURL на новом терминале следующим образом:

curl --location '127.0.0.1:3000/process-job' \
--header 'Content-Type: application/json' \
--data '{"message": "Hello there! I am Atul Anand"}'

Вы также можете использовать Postman следующим образом:

API ответит сообщением JSON: { «message»: «Здравствуйте! Я Атул Ананд» }. Одновременно потребитель возьмет поставленную в очередь задачу и начнет ее обрабатывать.

Шаг 5: Наблюдайте за результатом

В терминале, где работает потребитель, вы увидите вывод, указывающий, что потребитель обрабатывает задачу. В демонстрационных целях логика обработки опущена в предоставленных фрагментах кода. В реальном сценарии вы бы обработали задачу на основе требований вашего приложения.

Вывод будет выглядеть примерно так:

Обратите внимание на асинхронное поведение на выходе потребителя. Задания (сообщения) обрабатываются с задержкой (которую мы смоделировали в нашем коде).

Вот и все! Вы успешно запустили серверный API и наблюдали за асинхронной обработкой длительной задачи с использованием очереди сообщений RabbitMQ.

Продолжая развивать свой проект, не забудьте добавить надежную обработку ошибок и другие оптимизации, чтобы сделать систему готовой к работе. Не стесняйтесь экспериментировать с различными типами задач и логикой обработки в соответствии с потребностями вашего приложения.

Пожалуйста, не забудьте поставить этому посту 50 хлопков 👏🏼 и подписаться на мой блог ❤️, если он вам понравился и вы хотите увидеть больше.
Удачного программирования и обучения! 🚀