Введение. Микросервисная архитектура стала популярным способом создания масштабируемых и надежных приложений. В этом руководстве мы рассмотрим, как создать микросервисную архитектуру с использованием .NET 7 и Kafka. Мы также затронем такие темы, как шлюз API, Orchestrator, контрольный журнал и согласованность данных. Мы предоставим образец кода для каждой микрослужбы, чтобы помочь вам начать работу.

Настройка инфраструктуры В этом разделе мы настроим инфраструктуру, необходимую для построения нашей микросервисной архитектуры. Мы будем использовать Docker и Docker Compose для настройки Kafka, API Gateway, Orchestrator и микросервисов. Мы также настроим каждую службу для связи с Kafka.

  • Создание микросервисов В этом разделе мы создадим три микросервиса: User Service, Product Service и Order Service. Каждый микросервис будет иметь свою собственную базу данных и будет связываться с Kafka для отправки и получения сообщений. Мы предоставим образец кода для каждого микросервиса.
  • Реализация шлюза API В этом разделе мы реализуем шлюз API с помощью Ocelot. Шлюз API будет получать запросы от клиентов и направлять их в соответствующий микросервис. Мы предоставим пример кода для настройки шлюза API для работы с микросервисами.
  • Реализация Orchestrator В этом разделе мы реализуем Orchestrator с помощью MassTransit. Orchestrator будет координировать взаимодействие между микрослужбами для обеспечения согласованности данных. Мы предоставим пример кода для настройки Orchestrator для работы с микрослужбами.
  • Внедрение журнала аудита В этом разделе мы реализуем журнал аудита с использованием Serilog. Журнал аудита будет записывать все действия, предпринятые в микросервисах, для целей отладки и анализа. Мы предоставим пример кода для настройки журнала аудита для работы с микросервисами.
  • Обеспечение согласованности данных В этом разделе мы обеспечим согласованность данных путем реализации шаблона Saga с помощью MassTransit. Шаблон Saga гарантирует, что все микросервисы, участвующие в транзакции, будут успешными или откатятся в случае сбоя одного из них. Мы предоставим пример кода для реализации шаблона Saga в наших микросервисах.

Настройка инфраструктуры

Во-первых, нам нужно настроить инфраструктуру, необходимую для построения нашей микросервисной архитектуры. Мы будем использовать Docker и Docker Compose для настройки Kafka, API Gateway, Orchestrator и микросервисов. Мы также настроим каждую службу для связи с Kafka.

Шаг 1. Установите Docker и Docker Compose

Если в вашей системе не установлены Docker и Docker Compose, следуйте инструкциям, приведенным в официальной документации, чтобы установить их.

Шаг 2. Создайте файл Docker Compose

Создайте новый файл с именем docker-compose.yml в корневом каталоге вашего проекта и добавьте следующий код:

version: '3.9'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      - kafka-network
networks:
  kafka-network:
    driver: bridge

Этот файл docker-compose.yml создаст две службы: ZooKeeper и Kafka. ZooKeeper — это служба координации, используемая Kafka, а Kafka — это система обмена сообщениями, используемая нашими микросервисами для связи друг с другом. Мы также определяем сеть с именем kafka-network, которой принадлежат обе службы.

Шаг 3. Настройте микросервисы для связи с Kafka

В каждый микросервис нам нужно добавить клиентскую библиотеку Kafka и настроить ее для связи с только что созданным сервисом Kafka. Мы будем использовать библиотеку Confluent.Kafka, которая является .NET-клиентом для Kafka.

В микросервисе User добавьте в проект пакет Confluent.Kafka NuGet. Затем создайте новый файл с именем KafkaProducer.cs и добавьте следующий код:

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Newtonsoft.Json;

namespace UserService.Services
{
    public class KafkaProducer : IDisposable
    {
        private readonly IProducer<string, string> producer;
        public KafkaProducer(string bootstrapServers)
        {
            var config = new ProducerConfig { BootstrapServers = bootstrapServers };
            producer = new ProducerBuilder<string, string>(config).Build();
        }
        public async Task ProduceAsync<T>(string topic, T value)
        {
            var message = new Message<string, string>
            {
                Key = Guid.NewGuid().ToString(),
                Value = JsonConvert.SerializeObject(value)
            };
            await producer.ProduceAsync(topic, message);
        }
        public void Dispose()
        {
            producer.Dispose();
        }
    }
}

Этот класс KafkaProducer будет отвечать за отправку сообщений в Kafka. Мы создаем экземпляр IProducer<string, string>, который принимает объект ProducerConfig в качестве параметра. В методе ProduceAsync мы создаем новый объект Message<string, string> и сериализуем параметр value в формате JSON перед установкой его в качестве значения сообщения. Затем мы вызываем метод ProduceAsync экземпляра producer, чтобы отправить сообщение Kafka.

Затем создайте новый файл с именем KafkaConsumer.cs и добавьте следующий код:

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace UserService.Services
{
    public abstract class KafkaConsumer<T> : BackgroundService
    {
        private readonly ILogger logger;
        private readonly IConsumer<string, string> consumer;
        public KafkaConsumer(string bootstrapServers, string groupId, string topic, ILogger logger)
        {
            this.logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = groupId,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe(topic);
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var result = consumer.Consume(stoppingToken);
                    if (result != null && !result.IsPartitionEOF)
                    {
                        var value = JsonConvert.DeserializeObject<T>(result.Value);
                        await ConsumeAsync(value, stoppingToken);
                    }
                }
                catch (OperationCanceledException)
                {
                    logger.LogInformation("Kafka consumer has been cancelled.");
                    break;
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "An error occurred while consuming messages from Kafka.");
                }
            }
            consumer.Close();
        }
        public override void Dispose()
        {
            consumer.Dispose();
            base.Dispose();
        }
        protected abstract Task ConsumeAsync(T value, CancellationToken stoppingToken);
    }
}

Этот класс KafkaConsumer является абстрактным классом, который вы можете наследовать в своих микросервисах для использования сообщений от Kafka. Мы создаем экземпляр IConsumer<string, string>, который принимает объект ConsumerConfig в качестве параметра. В методе ExecuteAsync мы запускаем бесконечный цикл для получения сообщений от Kafka, используя метод Consume экземпляра consumer. Мы десериализуем значение сообщения в общий тип T и вызываем метод ConsumeAsync, который необходимо реализовать в микросервисе для обработки потребляемого сообщения.

Шаг 4. Добавьте службы API Gateway и Orchestrator

Добавьте в решение новый проект с именем ApiGateway и еще один проект с именем Orchestrator. В проекте ApiGateway добавьте следующий код в файл Program.cs:

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace ApiGateway
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

В проекте Orchestrator добавьте следующий код в файл Program.cs:

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Orchestrator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

Шаг 5. Добавьте конечные точки API в шлюз API

В проекте ApiGateway добавьте новый файл с именем Startup.cs и добавьте следующий код:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using UserService.Services;

namespace ApiGateway
{
    public class Startup
    {
        private readonly IConfiguration configuration;
        public Startup(IConfiguration configuration)
        {
            this.configuration = configuration;
        }
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            // Register UserServiceClient as a singleton service
            services.AddSingleton<IUserServiceClient>(new UserServiceClient(configuration["UserServiceUrl"]));
            // Register KafkaProducer as a singleton service
            services.AddSingleton<KafkaProducer<UserCreatedEvent>>(new KafkaProducer<UserCreatedEvent>(configuration["KafkaBootstrapServers"], "user-created"));
            // Register UserService as a singleton service
            services.AddSingleton<UserService>();
            // Register UserCreatedEventHandler as a singleton service
            services.AddSingleton<UserCreatedEventHandler>();
            // Register the API Gateway's event handler
            services.AddSingleton<ApiGatewayEventHandler>();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}

В методе ConfigureServices регистрируем следующие сервисы:

  • IUserServiceClient: одноэлементный экземпляр UserServiceClient, который мы используем для вызова API микросервиса UserService.
  • KafkaProducer<UserCreatedEvent>: одноэлементный экземпляр KafkaProducer, который мы используем для публикации сообщений в теме user-created в Kafka.
  • UserService: одноэлементный экземпляр UserService, который мы используем для обработки HTTP-запросов, отправляемых на конечную точку /users шлюза API.
  • UserCreatedEventHandler: одноэлементный экземпляр UserCreatedEventHandler, который мы используем для обработки сообщений UserCreatedEvent, полученных от Kafka.
  • ApiGatewayEventHandler: одноэлементный экземпляр ApiGatewayEventHandler, который мы используем для обработки событий UserCreated, полученных от UserCreatedEventHandler.

В методе Configure мы добавляем промежуточное ПО Controllers и сопоставляем конечные точки API с соответствующими контроллерами.

Шаг 6. Добавьте конечные точки API в микрослужбу UserService

В проекте UserService добавьте новую папку с именем Controllers и добавьте новый файл с именем UserController.cs. Добавьте следующий код в файл UserController.cs:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using UserService.Models;
using UserService.Services;

namespace UserService.Controllers
{
    [ApiController]
    [Route("users")]
    public class UserController : ControllerBase
    {
        private readonly IUserService userService;
        private readonly KafkaProducer<UserCreatedEvent> kafkaProducer;
        public UserController(IUserService userService, KafkaProducer<UserCreatedEvent> kafkaProducer)
        {
            this.userService = userService;
            this.kafkaProducer = kafkaProducer;
        }
        [HttpPost]
        public async Task<IActionResult> CreateUser(CreateUserRequest request)
        {
            var user = await userService.CreateUserAsync(request);
            var userCreatedEvent = new UserCreatedEvent(user.Id, user.Name, user.Email);
            await kafkaProducer.ProduceAsync(userCreatedEvent);
            return CreatedAtAction(nameof(GetUser), new { id = user.Id }, user);
        }
        [HttpGet("{id}")]
        public async Task<IActionResult> GetUser(int id)
        {
            var user = await userService.GetUserAsync(id);
            if (user == null)
            {
                return NotFound();
            }
            return Ok(user);
        }
    }
}

В классе UserController мы определяем две конечные точки HTTP:

  • Сообщение `POST /users: creates a new user and publishes a UserCreatedEvent` для Kafka.
  • GET /users/{id}: извлекает существующего пользователя по идентификатору.

В методе CreateUser мы сначала вызываем метод userService.CreateUserAsync для создания нового пользователя. Затем мы создаем новый экземпляр UserCreatedEvent и публикуем его в Kafka с помощью метода kafkaProducer.ProduceAsync. Наконец, мы возвращаем ответ 201 Created с данными вновь созданного пользователя.

В методе GetUser мы вызываем метод userService.GetUserAsync для получения существующего пользователя по идентификатору. Если пользователь не существует, мы возвращаем ответ 404 Not Found. В противном случае мы возвращаем ответ 200 OK с данными пользователя.

Шаг 7. Добавьте обработчики событий в микрослужбу UserService

В проекте UserService добавьте новую папку с именем EventHandlers и добавьте новый файл с именем UserCreatedEventHandler.cs. Добавьте следующий код в файл UserCreatedEventHandler.cs:

using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using UserService.Models;
using UserService.Services;

namespace UserService.EventHandlers
{
    public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
    {
        private readonly IUserService userService;
        private readonly ILogger<UserCreatedEventHandler> logger;
        public UserCreatedEventHandler(IUserService userService, ILogger<UserCreatedEventHandler> logger)
        {
            this.userService = userService;
            this.logger = logger;
        }
        public async Task HandleAsync(UserCreatedEvent message, ConsumeResult<string, string> consumeResult)
        {
            logger.LogInformation($"Received UserCreatedEvent message: {message}");
            // Save the user data to the database
            var user = new User
            {
                Id = message.UserId,
                Name = message.UserName,
                Email = message.UserEmail
            };
            await userService.CreateUserAsync(user);
            logger.LogInformation($"User with ID {message.UserId} created successfully");
        }
    }
}

В классе UserCreatedEventHandler мы реализуем интерфейс IEventHandler<UserCreatedEvent>, который определяет единственный метод HandleAsync, который вызывается KafkaConsumer при получении нового сообщения UserCreatedEvent от Kafka.

В методе HandleAsync мы сначала регистрируем полученное сообщение, используя экземпляр ILogger. Затем мы создаем новый экземпляр User из данных полученного сообщения и вызываем метод userService.CreateUserAsync для сохранения пользователя в базе данных. Наконец, мы регистрируем сообщение об успехе с идентификатором пользователя.

Шаг 8. Добавьте обработчики событий в микрослужбу ApiGateway

В проекте ApiGateway добавьте новую папку с именем EventHandlers и добавьте новый файл с именем ApiGatewayEventHandler.cs. Добавьте следующий код в файл ApiGatewayEventHandler.cs:

using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using UserService.Events;

namespace ApiGateway.EventHandlers
{
    public class ApiGatewayEventHandler : IEventHandler<UserCreated>
    {
        private readonly ILogger<ApiGatewayEventHandler> logger;
        public ApiGatewayEventHandler(ILogger<ApiGatewayEventHandler> logger)
        {
            this.logger = logger;
        }
        public Task HandleAsync(UserCreated message, ConsumeResult<string, string> consumeResult)
        {
            logger.LogInformation($"Received UserCreated event: {message}");
            // TODO: Implement custom event handling logic here
            return Task.CompletedTask;
        }
    }
}

В классе ApiGatewayEventHandler мы реализуем интерфейс IEventHandler<UserCreated>, который определяет одно событие HandleAsyncmethod that is called by theKafkaConsumerwhen a newUserCreated, полученное от Kafka.

В методе HandleAsync мы сначала регистрируем полученное событие, используя экземпляр ILogger. Затем мы можем реализовать пользовательскую логику обработки событий в соответствии с требованиями нашего приложения.

Шаг 9. Добавьте контрольный журнал в микрослужбу UserService

В проекте UserService добавьте новую папку с именем AuditTrail и добавьте новый файл с именем UserAuditTrail.cs. Добавьте следующий код в файл UserAuditTrail.cs:

using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using UserService.Models;
using UserService.Services;

namespace UserService.AuditTrail
{
    public class UserAuditTrail : IUserAuditTrail
    {
        private readonly IUserService userService;
        private readonly ILogger<UserAuditTrail> logger;
        public UserAuditTrail(IUserService userService, ILogger<UserAuditTrail> logger)
        {
            this.userService = userService;
            this.logger = logger;
        }
        public async Task LogUserCreated(User user)
        {
            logger.LogInformation($"User with ID {user.Id} created");
            // TODO: Save the audit trail to a database or other storage mechanism
        }
    }
}

В классе UserAuditTrail мы реализуем интерфейс IUserAuditTrail, который определяет единственный метод LogUserCreated, который вызывается классом UserService при создании нового пользователя.

В методе LogUserCreated мы сначала регистрируем сообщение о том, что пользователь был создан с использованием экземпляра ILogger. Затем мы можем сохранить контрольный журнал в базе данных или другом механизме хранения в зависимости от требований нашего приложения.

Шаг 10. Обеспечьте согласованность данных с помощью координатора распределенных транзакций

В архитектуре микросервисов поддержание согласованности данных может быть сложной задачей, особенно когда несколько сервисов должны изменять одни и те же данные. Одним из решений этой проблемы является использование координатора распределенных транзакций, такого как Dapper или Entity Framework.

В проекте UserService мы будем использовать Entity Framework для обеспечения согласованности данных. Во-первых, нам нужно добавить в проект пакет Microsoft.EntityFrameworkCore NuGet.

Далее мы изменим UserService, чтобы использовать Entity Framework вместо класса UserRepository. Добавьте новый файл с именем ApplicationDbContext.cs в проект UserService и добавьте следующий код:

using Microsoft.EntityFrameworkCore;
using UserService.Models;

namespace UserService.Data
{
    public class ApplicationDbContext : DbContext
    {
        public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
            : base(options)
        {
        }
        public DbSet<User> Users { get; set; }
        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            modelBuilder.Entity<User>()
                .HasKey(u => u.Id);
        }
    }
}

В классе ApplicationDbContext мы определяем свойство DbSet<User> и переопределяем метод OnModelCreating для настройки первичного ключа объекта User.

Далее мы изменим UserService для использования Entity Framework. Замените метод CreateUserAsync в файле UserService.cs следующим кодом:

public async Task<User> CreateUserAsync(User user)
{
    using var transaction = await dbContext.Database.BeginTransactionAsync();

    try
    {
        await dbContext.Users.AddAsync(user);
        await dbContext.SaveChangesAsync();

        await auditTrail.LogUserCreated(user);

        var userCreatedEvent = new UserCreatedEvent
        {
            UserId = user.Id,
            UserName = user.Name,
            UserEmail = user.Email
        };
        await kafkaProducer.ProduceAsync("user-created", userCreatedEvent);

        await transaction.CommitAsync();

        return user;
    }
    catch (Exception ex)
    {
        await transaction.RollbackAsync();
        logger.LogError(ex, "Error creating user");
        throw;
    }
}

В методе CreateUserAsync мы сначала начинаем новую транзакцию, используя метод dbContext.Database.BeginTransactionAsync. Затем мы добавляем нового пользователя в таблицу Users с помощью Entity Framework и сохраняем изменения с помощью метода dbContext.SaveChangesAsync.

Затем мы регистрируем создание пользователя с помощью службы UserAuditTrail, создаем событие UserCreated с помощью производителя Kafka и фиксируем транзакцию с помощью метода transaction.CommitAsync.

Если во время транзакции возникает исключение, мы откатываем изменения с помощью метода transaction.RollbackAsync, регистрируем ошибку с помощью экземпляра ILogger и повторно выбрасываем исключение.

Шаг 11. Запустите микросервисы

Чтобы запустить микросервисы, нам нужно запустить следующие сервисы по порядку:

  1. Запустите сервер Кафки
  2. Запустите службу контрольного журнала
  3. Запустите службу пользователей
  4. Запустите ApiGateway

Чтобы запустить сервер Kafka, вы можете следовать документации Kafka для своей платформы.

Чтобы запустить службу контрольного журнала, перейдите в каталог проекта AuditTrailService и выполните следующую команду:

dotnet run

Чтобы запустить UserService, перейдите в каталог проекта UserService и выполните следующую команду:

dotnet run

Чтобы запустить ApiGateway, перейдите в каталог проекта ApiGateway и выполните следующую команду:

dotnet run

Шаг 12. Тестирование микросервисов

Чтобы протестировать микросервисы, вы можете использовать такой инструмент, как Postman или curl, для отправки HTTP-запросов на ApiGateway. Ниже приведены примеры запросов, которые вы можете отправить:

  1. Создайте нового пользователя:
curl -X POST -H "Content-Type: application/json" \
    -d '{"name": "John Doe", "email": "[email protected]"}' \
    http://localhost:5000/api/users

2. Получить всех пользователей:

curl http://localhost:5000/api/users

3. Получить конкретного пользователя по ID:

curl http://localhost:5000/api/users/1

Заключение

В этом руководстве мы продемонстрировали, как построить архитектуру микросервисов с использованием .Net 7, API-шлюза, Kafka, оркестратора, контрольного журнала и согласованности данных. Мы внедрили микрослужбу UserService, которая позволяет нам создавать и извлекать пользователей, и продемонстрировали, как использовать Kafka для создания и использования событий, журнал аудита для регистрации событий и Entity Framework для обеспечения согласованности данных.