Възползвайте се от възможностите на Nameko за изграждане на мащабируеми, стабилни системи за микросервизи

В тази статия ще научим за Nameko и неговите възможности като рамка за микросервизи.

Какво е Nameko?

Nameko е рамка за изграждане на леки, силно мащабируеми и устойчиви на грешки услуги в Python, следвайки дизайн на архитектура на микроуслуга.

Предлага се с вградена поддръжка за:

  • RPC през AMQP
  • Асинхронни събития (pub-sub) през AMQP

Защо Nameko?

Nameko ви позволява да изградите услуга, която може да отговаря на RPC съобщения, да изпраща събития за определени действия и да слуша събития от други услуги. Може също да има HTTP интерфейси за клиенти, които не могат да говорят AMQP.

Нека създадем основна услуга Nameko и да експериментираме с нейните възможности.

Настройка на основната среда

Първо, ще ви трябва Docker инсталиран. Ще използваме Python 3, така че се уверете, че и него сте инсталирали.
За да стартираме Nameko, имаме нужда от RabbitMQ. Той ще отговаря за комуникацията между нашите услуги Nameko.

Инсталирай

$ pip install nameko 

Стартирайте RabbitMQ контейнер

$ docker run -p 5672:5672 — hostname nameko-rabbitmq rabbitmq:3

Здравей свят в Nameko

Услугата Nameko е просто клас на Python. Класът капсулира логиката в своите методи и декларира всички зависимости като атрибути.

Продължете и създайте файл, наречен Service.py със следното съдържание:

from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"

Нека изпълним нашия пример. Ако стартирате RabbitMQ, просто стартирайте:

$ nameko run Service

Nameko прилага автоматично откриване на услуга, което означава, че когато извиква RPC метод през AMQP, Nameko ще се опита да намери съответната услуга RabbitMQ сама.

За да тествате нашата услуга, изпълнете командата по-долу в друг терминал:

>>> n.rpc.service.receive_event(event={‘message’: ‘Hello World!!’})

Когато се извика RPC входна точка, се създава Nameko worker. Worker е просто екземпляр без състояние на сервизния клас, което го прави по своята същност безопасен за нишки. Максималният брой работници по подразбиране за услуга е зададен на 10.

„Прочетете повече за Nameko Workers тук“.

Ако максималният брой работници е зададен на 1, тогава само 1 Nameko работник ще се изпълнява в даден момент, т.е. ще се държи като обикновена опашка.

Как да комуникирате между 2 услуги на Nameko

За да комуникира от една услуга на Nameko към друга и обратно, Nameko предоставя конструкция RpcProxy. Ето как да го използвате:

from nameko.rpc import rpc, RpcProxy

class SenderService:
    name = "sender_service"
    receiver_service_proxy = RpcProxy("receiver_service")

    @rpc
    def send_event(self, event):
        return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})

class ReceiverService:
    name = "receiver_service"

    @rpc
    def receive_event(self, event):
        return f"Event Received: {event}"

Комуникация между услуга Nameko и услуга, която не е Nameko

Ще има сценарии, при които трябва да извикаме услуга Nameko от нещо, което не е услуга Nameko, като API услуга или cron задача. Ето как да го направите:

from nameko.standalone.rpc import ClusterRpcProxy

AMQP_URI = "pyamqp://user:paswword@hostname"

config = {
    'AMQP_URI': AMQP_URI
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service.receive_event({'message': 'Hello World!!'})

Паралелност

Nameko е изграден върху библиотеката eventlet, която осигурява едновременност чрез greenthreads.

Зелените нишки, за разлика от нишките на ОС, съвместно се поддават една на друга, вместо да бъдат предварително планирани от ОС. Това поведение се оказва изгодно, когато услугата е натоварена с I/O.

Една зелена нишка дава контрол само когато е заета с извършване на входно-изходни операции – като дава възможност на друга зелена нишка да се изпълни, като по този начин позволява на услугата да използва споделени структури от данни, без да е необходимо да използва ключалки и други механизми за синхронизиране.

Нека експериментираме на практика с паралелността на Nameko, като модифицираме горния код:

from time import sleep
from nameko.rpc import rpc

class Service:
    name = "service"

    @rpc
    def receive_event(self, event):
        sleep(5)
        return f"Event Received: {event}"

Използваме sleep от модула за време, което е блокиращо повикване. Въпреки това, когато изпълняваме нашите услуги с помощта на nameko run, nameko автоматично ще коригира блокиращите повиквания към неблокиращи повиквания като sleep(5), т.е. ще го направи асинхронен.

Времето за отговор на едно RPC повикване към нашата услуга ще бъде 5 секунди. Сега, ако направим 10 повиквания наведнъж към един и същ RPC, колко време ще отнеме, за да получим отговор на всички 10 повиквания?

Нека изпълним следния код в nameko shell:

def time_concurrent_invocations():
    start_time = time.perf_counter()
    responses = []
    num_concurrent_calls = 10
    for i in range(num_concurrent_calls):
        response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
        responses.append(response)

    for response in responses:
        print(response.result)

    end_time = time.perf_counter()

    print(f'Total Time: {round(end_time-start_time, 3)}')

time_concurrent_invocations()

Този пример се изпълнява само за около пет секунди. Всеки работник ще бъде блокиран в очакване извикването за заспиване да приключи, но това не спира друг работник да започне, имплицитно отстъпвайки в действие.

Ако промените num_concurrent_calls = 20 в горния фрагмент, изпълнението ще приключи след 10 секунди.

Асинхронен Pub-Sub

Да предположим, че сега трябва да извършим асинхронна задача като изпращане на известие или качване на файл в облака:

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class MessageService:

    name = "message_service"

    dispatch = EventDispatcher()


    def time_consuming_function(self, payload):
        self.dispatch("heavy_payload_event", payload)

    @rpc
    def receive_message(self, event):

        if event['payload']:
            self.time_consuming_function(event['payload'])

        print(f'Message Received: {event['message']}')



class TimeConsumingService:
    name = "time_consuming_service"

    @event_handler("message_service", "heavy_payload_event")
    def time_consuming_event_handler(self, payload):
        pass

Когато receive_message обработва събитие с полезен товар, той извиква time_consuming_function, който използва EventDispatcher за обработка на полезния товар по асинхронен начин чрез извикване на time_consuming_event_handler в отделна зелена нишка. Извикващата нишка тук не чака манипулаторът на събития да върне отговор, като по този начин позволява на извикващата нишка да завърши изпълнението си по-бързо и да приеме допълнителни заявки.

Мащабируем

Използваме само един сървър и изпълняваме едно копие на RabbitMQ. В производствена среда ще искате произволно да увеличите броя на възлите, изпълняващи услугата, която получава твърде много обаждания.

За да симулираме мащабиране на услугата, нека прегледаме нашата услуга от раздела за едновременност. Отворете друг терминал и стартирайте услугата както преди, като използвате nameko run Service. Това ще стартира друг екземпляр на услуга с потенциал да стартира още десет работни. Сега опитайте отново да стартирате този фрагмент с num_concurrent_calls = 20. Сега отново трябва да отнеме пет секунди, за да стартирате. Когато има повече от един работещи екземпляри на услугата, Nameko ще обвърже RPC заявките сред наличните екземпляри.

Всъщност можете да конфигурирате тези услуги по такъв начин, че да могат да работят на напълно различни машини и да ги мащабирате независимо. Всичко, което трябва да направите, е да насочите тези услуги към същия RabbitMQ брокер.

Създайте конфигурационен файл с URI на брокера:

# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/

Стартирайте тези услуги на различни машини, като използвате:

$ nameko run <nameko_service> --config config.yaml

Устойчив на грешки

Nameko е много здрав и устойчив на грешки, така че продължава да работи правилно в случай на повреда на един или повече възли в сервизния клъстер, докато поне един здрав възел остане да функционира.

Опитайте да стартирате 3 екземпляра на услугата и изпълнете тестовия фрагмент с num_concurrent_calls = 50. Веднага след като изпълните тестовия фрагмент, убийте едно или 2 копия на Service. Пропуснатите съобщения ще бъдат пренасочени към здрави възли, като по този начин се избягва загубата на съобщения.

Това поведение се дължи на факта, че съобщенията се ack’d след като изпълнението на работния файл завърши успешно и ако връзката се загуби след доставката, но преди потвърждението, RabbitMQ ще изиска отново и предаде съобщението.

Какво се случва, ако сървърът RabbitMQ умре и има останали съобщения в опашката?

Nameko задава delivery_mode=PERSISTENT по подразбиране за опашките, които създава за RPC през AMQP. Това казва на RabbitMQ да запише съобщенията на диск.

Има обаче кратък времеви прозорец, когато RabbitMQ е приел съобщение, но все още не го е запазил, което означава, че гаранциите за постоянство не са силни.

За да разреши това, Nameko използва по подразбиране „потвърждаване от издателя“. Потвържденията имат наказание за производителност, но гарантират, че съобщенията не се губят.

Заключение

Nameko е проектиран да ви помогне да изградите системи, използващи микроуслуги и мащабиране от един екземпляр на една услуга до клъстер с много екземпляри на много различни услуги.

За да научите повече за Nameko, вижте Documentation Nameko и се присъединете към Discourse Nameko.