Используйте возможности Nameko для создания масштабируемых и надежных микросервисных систем.

В этой статье мы узнаем о Nameko и его возможностях как микросервисной инфраструктуры.

Что такое Намеко?

Nameko — это платформа для создания легковесных, хорошо масштабируемых и отказоустойчивых сервисов на Python в соответствии с архитектурой микросервисов.

Он поставляется со встроенной поддержкой:

  • RPC через AMQP
  • Асинхронные события (pub-sub) через AMQP

Почему Намеко?

Nameko позволяет создать службу, которая может отвечать на RPC-сообщения, отправлять события при определенных действиях и прослушивать события от других служб. У него также могут быть HTTP-интерфейсы для клиентов, не говорящих на AMQP.

Давайте создадим базовый сервис Nameko и поэкспериментируем с его возможностями.

Настройка базовой среды

Во-первых, вам понадобится Докер установлен. Мы будем использовать Python 3, поэтому убедитесь, что он у вас тоже установлен.
Чтобы запустить Nameko, нам понадобится RabbitMQ. Он будет отвечать за связь между нашими службами Nameko.

Установить

$ pip install nameko 

Запустите контейнер RabbitMQ

$ docker run -p 5672:5672 — hostname nameko-rabbitmq rabbitmq:3

Привет, мир в Намеко

Служба Nameko — это просто класс Python. Класс инкапсулирует логику в своих методах и объявляет любые зависимости как атрибуты.

Идите вперед и создайте файл с именем Service.py со следующим содержимым:

from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"

Запустим наш пример. Если у вас запущен RabbitMQ, просто запустите:

$ nameko run Service

Nameko реализует автоматическое обнаружение службы, то есть при вызове метода RPC через AMQP Nameko попытается найти соответствующую службу RabbitMQ самостоятельно.

Чтобы протестировать наш сервис, запустите следующую команду в другом терминале:

>>> n.rpc.service.receive_event(event={‘message’: ‘Hello World!!’})

При вызове точки входа RPC создается рабочий процесс Nameko. Рабочий процесс — это просто экземпляр класса службы без сохранения состояния, что делает его по своей сути потокобезопасным. Максимальное количество рабочих по умолчанию для службы установлено на 10.

Подробнее о работниках Nameko читайте здесь.

Если максимальное количество воркеров установлено на 1, то одновременно будет выполняться только 1 воркер Nameko, т. е. он будет вести себя как обычная очередь.

Как установить связь между двумя службами Nameko

Для связи от одной службы Nameko к другой и наоборот, Nameko предоставляет конструкцию RpcProxy. Вот как вы его используете:

from nameko.rpc import rpc, RpcProxy

class SenderService:
    name = "sender_service"
    receiver_service_proxy = RpcProxy("receiver_service")

    @rpc
    def send_event(self, event):
        return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})

class ReceiverService:
    name = "receiver_service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"

Связь между службами Nameko и службами, не относящимися к Nameko

Будут сценарии, в которых нам нужно вызвать службу Nameko из чего-то, что не является службой Nameko, например из службы API или задания cron. Вот как это сделать:

from nameko.standalone.rpc import ClusterRpcProxy

AMQP_URI = "pyamqp://user:paswword@hostname"

config = {
    'AMQP_URI': AMQP_URI
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service.receive_event({'message': 'Hello World!!'})

параллелизм

Nameko построен на основе библиотеки eventlet, которая обеспечивает параллелизм через greenthreads.

Зеленые потоки, в отличие от потоков ОС, совместно уступают друг другу, а не заранее планируются ОС. Такое поведение оказывается выгодным, когда служба интенсивно использует операции ввода-вывода.

Один зеленый поток передает управление только тогда, когда он занят вводом-выводом, давая возможность другому зеленому потоку выполниться, что позволяет службе использовать общие структуры данных без необходимости использования блокировок и других механизмов синхронизации.

Давайте поэкспериментируем с параллелизмом Nameko на практике, изменив приведенный выше код:

from time import sleep
from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        sleep(5)
        return f"Event Received: {event}"

Мы используем sleep из модуля time, который является блокирующим вызовом. Однако при запуске наших сервисов с использованием nameko run nameko автоматически исправит блокирующие вызовы для неблокирующих вызовов, таких как sleep(5), то есть сделает их асинхронными.

Время отклика на один RPC-вызов нашего сервиса составит 5 секунд. Теперь, если мы сделаем 10 вызовов за один раз на один и тот же RPC, сколько времени потребуется, чтобы получить ответ на все 10 вызовов?

Давайте запустим следующий код в оболочке nameko:

def time_concurrent_invocations():
    start_time = time.perf_counter()
    responses = []
    num_concurrent_calls = 10
    for i in range(num_concurrent_calls):
        response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
        responses.append(response)

    for response in responses:
        print(response.result)

    end_time = time.perf_counter()

    print(f'Total Time: {round(end_time-start_time, 3)}')

time_concurrent_invocations()

Этот пример выполняется всего за пять секунд. Каждый воркер будет заблокирован в ожидании завершения вызова sleep, но это не помешает запуститься другому воркеру, неявно уступая в действии.

Если вы измените num_concurrent_calls = 20 в приведенном выше фрагменте, выполнение завершится через 10 секунд.

Асинхронный паб-саб

Предположим, теперь нам нужно выполнить асинхронную задачу, например отправить уведомление или загрузить файл в облако:

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class MessageService:

    name = "message_service"

    dispatch = EventDispatcher()


    def time_consuming_function(self, payload):
        self.dispatch("heavy_payload_event", payload)

    @rpc
    def receive_message(self, event):

        if event['payload']:
            self.time_consuming_function(event['payload'])

        print(f'Message Received: {event['message']}')



class TimeConsumingService:
    name = "time_consuming_service"

    @event_handler("message_service", "heavy_payload_event")
    def time_consuming_event_handler(self, payload):
        pass

Когда receive_message обрабатывает событие с полезной нагрузкой, он вызывает time_consuming_function, который использует EventDispatcher для асинхронной обработки полезной нагрузки, вызывая time_consuming_event_handler в отдельном зеленом потоке. Вызывающий поток здесь не ждет, пока обработчик событий вернет ответ, тем самым позволяя вызывающему потоку быстрее завершить свое выполнение и принять дальнейшие запросы.

Масштабируемость

Мы использовали только один сервер и запускали один экземпляр RabbitMQ. В производственной среде вы захотите произвольно увеличить количество узлов, на которых запущена служба, которая получает слишком много вызовов.

Чтобы смоделировать масштабирование сервиса, давайте вернемся к нашему сервису из раздела параллелизма. Откройте другой терминал и запустите службу, как и раньше, используя nameko run Service. Это запустит еще один экземпляр службы с возможностью запуска еще десяти рабочих процессов. Теперь попробуйте снова запустить этот фрагмент с num_concurrent_calls = 20. Теперь для запуска снова потребуется пять секунд. Когда запущено более одного экземпляра службы, Nameko будет циклически распределять запросы RPC среди доступных экземпляров.

На самом деле вы можете настроить эти службы таким образом, чтобы они могли работать на совершенно разных машинах и масштабировать их независимо. Все, что вам нужно сделать, это указать эти службы на одного и того же брокера RabbitMQ.

Создайте файл конфигурации с URI брокера:

# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/

Запустите эти службы на разных машинах, используя:

$ nameko run <nameko_service> --config config.yaml

Отказоустойчивой

Nameko обладает высокой надежностью и отказоустойчивостью, поэтому он продолжает работать должным образом в случае сбоя одного или нескольких узлов в сервисном кластере до тех пор, пока хотя бы один работоспособный узел не останется в рабочем состоянии.

Попробуйте запустить 3 экземпляра службы и выполнить тестовый фрагмент с num_concurrent_calls = 50. Как только вы выполните тестовый фрагмент, убейте один или два экземпляра Service. Пропущенные сообщения будут перенаправлены на исправные узлы, что позволит избежать потери сообщений.

Такое поведение связано с тем, что сообщения подтверждаются после успешного завершения рабочего процесса, и если соединение теряется после доставки, но до подтверждения, RabbitMQ восстанавливает и повторно доставляет сообщение.

Что произойдет, если сервер RabbitMQ умрет, а в очереди останутся сообщения?

Nameko устанавливает delivery_mode=PERSISTENT по умолчанию для очередей, которые он создает для RPC через AMQP. Это говорит RabbitMQ сохранять сообщения на диск.

Однако существует короткий промежуток времени, когда RabbitMQ принял сообщение, но еще не сохранил его, что означает, что гарантии сохранения ненадежны.

Чтобы решить эту проблему, Nameko по умолчанию использует подтверждение издателя. Подтверждения снижают производительность, но гарантируют, что сообщения не будут потеряны.

Заключение

Nameko предназначен для создания систем с использованием микрослужб и масштабирования от одного экземпляра одной службы до кластера с множеством экземпляров множества различных служб.

Чтобы узнать больше о Nameko, ознакомьтесь с Документацией Nameko и присоединитесь к Обсуждению Nameko.