Введение. Микросервисная архитектура стала популярным способом создания масштабируемых и надежных приложений. В этом руководстве мы рассмотрим, как создать микросервисную архитектуру с использованием .NET 7 и Kafka. Мы также затронем такие темы, как шлюз API, Orchestrator, контрольный журнал и согласованность данных. Мы предоставим образец кода для каждой микрослужбы, чтобы помочь вам начать работу.
Настройка инфраструктуры В этом разделе мы настроим инфраструктуру, необходимую для построения нашей микросервисной архитектуры. Мы будем использовать Docker и Docker Compose для настройки Kafka, API Gateway, Orchestrator и микросервисов. Мы также настроим каждую службу для связи с Kafka.
- Создание микросервисов В этом разделе мы создадим три микросервиса: User Service, Product Service и Order Service. Каждый микросервис будет иметь свою собственную базу данных и будет связываться с Kafka для отправки и получения сообщений. Мы предоставим образец кода для каждого микросервиса.
- Реализация шлюза API В этом разделе мы реализуем шлюз API с помощью Ocelot. Шлюз API будет получать запросы от клиентов и направлять их в соответствующий микросервис. Мы предоставим пример кода для настройки шлюза API для работы с микросервисами.
- Реализация Orchestrator В этом разделе мы реализуем Orchestrator с помощью MassTransit. Orchestrator будет координировать взаимодействие между микрослужбами для обеспечения согласованности данных. Мы предоставим пример кода для настройки Orchestrator для работы с микрослужбами.
- Внедрение журнала аудита В этом разделе мы реализуем журнал аудита с использованием Serilog. Журнал аудита будет записывать все действия, предпринятые в микросервисах, для целей отладки и анализа. Мы предоставим пример кода для настройки журнала аудита для работы с микросервисами.
- Обеспечение согласованности данных В этом разделе мы обеспечим согласованность данных путем реализации шаблона Saga с помощью MassTransit. Шаблон Saga гарантирует, что все микросервисы, участвующие в транзакции, будут успешными или откатятся в случае сбоя одного из них. Мы предоставим пример кода для реализации шаблона Saga в наших микросервисах.
Настройка инфраструктуры
Во-первых, нам нужно настроить инфраструктуру, необходимую для построения нашей микросервисной архитектуры. Мы будем использовать Docker и Docker Compose для настройки Kafka, API Gateway, Orchestrator и микросервисов. Мы также настроим каждую службу для связи с Kafka.
Шаг 1. Установите Docker и Docker Compose
Если в вашей системе не установлены Docker и Docker Compose, следуйте инструкциям, приведенным в официальной документации, чтобы установить их.
Шаг 2. Создайте файл Docker Compose
Создайте новый файл с именем docker-compose.yml
в корневом каталоге вашего проекта и добавьте следующий код:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:6.1.1 container_name: zookeeper ports: - "2181:2181" networks: - kafka-network kafka: image: confluentinc/cp-kafka:6.1.1 container_name: kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 networks: - kafka-network networks: kafka-network: driver: bridge
Этот файл docker-compose.yml
создаст две службы: ZooKeeper и Kafka. ZooKeeper — это служба координации, используемая Kafka, а Kafka — это система обмена сообщениями, используемая нашими микросервисами для связи друг с другом. Мы также определяем сеть с именем kafka-network
, которой принадлежат обе службы.
Шаг 3. Настройте микросервисы для связи с Kafka
В каждый микросервис нам нужно добавить клиентскую библиотеку Kafka и настроить ее для связи с только что созданным сервисом Kafka. Мы будем использовать библиотеку Confluent.Kafka, которая является .NET-клиентом для Kafka.
В микросервисе User
добавьте в проект пакет Confluent.Kafka NuGet. Затем создайте новый файл с именем KafkaProducer.cs
и добавьте следующий код:
using System; using System.Threading.Tasks; using Confluent.Kafka; using Newtonsoft.Json; namespace UserService.Services { public class KafkaProducer : IDisposable { private readonly IProducer<string, string> producer; public KafkaProducer(string bootstrapServers) { var config = new ProducerConfig { BootstrapServers = bootstrapServers }; producer = new ProducerBuilder<string, string>(config).Build(); } public async Task ProduceAsync<T>(string topic, T value) { var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = JsonConvert.SerializeObject(value) }; await producer.ProduceAsync(topic, message); } public void Dispose() { producer.Dispose(); } } }
Этот класс KafkaProducer
будет отвечать за отправку сообщений в Kafka. Мы создаем экземпляр IProducer<string, string>
, который принимает объект ProducerConfig
в качестве параметра. В методе ProduceAsync
мы создаем новый объект Message<string, string>
и сериализуем параметр value
в формате JSON перед установкой его в качестве значения сообщения. Затем мы вызываем метод ProduceAsync
экземпляра producer
, чтобы отправить сообщение Kafka.
Затем создайте новый файл с именем KafkaConsumer.cs
и добавьте следующий код:
using System; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Newtonsoft.Json; namespace UserService.Services { public abstract class KafkaConsumer<T> : BackgroundService { private readonly ILogger logger; private readonly IConsumer<string, string> consumer; public KafkaConsumer(string bootstrapServers, string groupId, string topic, ILogger logger) { this.logger = logger; var config = new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupId, AutoOffsetReset = AutoOffsetReset.Earliest }; consumer = new ConsumerBuilder<string, string>(config).Build(); consumer.Subscribe(topic); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { var result = consumer.Consume(stoppingToken); if (result != null && !result.IsPartitionEOF) { var value = JsonConvert.DeserializeObject<T>(result.Value); await ConsumeAsync(value, stoppingToken); } } catch (OperationCanceledException) { logger.LogInformation("Kafka consumer has been cancelled."); break; } catch (Exception ex) { logger.LogError(ex, "An error occurred while consuming messages from Kafka."); } } consumer.Close(); } public override void Dispose() { consumer.Dispose(); base.Dispose(); } protected abstract Task ConsumeAsync(T value, CancellationToken stoppingToken); } }
Этот класс KafkaConsumer
является абстрактным классом, который вы можете наследовать в своих микросервисах для использования сообщений от Kafka. Мы создаем экземпляр IConsumer<string, string>
, который принимает объект ConsumerConfig
в качестве параметра. В методе ExecuteAsync
мы запускаем бесконечный цикл для получения сообщений от Kafka, используя метод Consume
экземпляра consumer
. Мы десериализуем значение сообщения в общий тип T
и вызываем метод ConsumeAsync
, который необходимо реализовать в микросервисе для обработки потребляемого сообщения.
Шаг 4. Добавьте службы API Gateway и Orchestrator
Добавьте в решение новый проект с именем ApiGateway
и еще один проект с именем Orchestrator
. В проекте ApiGateway
добавьте следующий код в файл Program.cs
:
using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; namespace ApiGateway { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); } }
В проекте Orchestrator
добавьте следующий код в файл Program.cs
:
using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; namespace Orchestrator { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); } }
Шаг 5. Добавьте конечные точки API в шлюз API
В проекте ApiGateway
добавьте новый файл с именем Startup.cs
и добавьте следующий код:
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using UserService.Services; namespace ApiGateway { public class Startup { private readonly IConfiguration configuration; public Startup(IConfiguration configuration) { this.configuration = configuration; } public void ConfigureServices(IServiceCollection services) { services.AddControllers(); // Register UserServiceClient as a singleton service services.AddSingleton<IUserServiceClient>(new UserServiceClient(configuration["UserServiceUrl"])); // Register KafkaProducer as a singleton service services.AddSingleton<KafkaProducer<UserCreatedEvent>>(new KafkaProducer<UserCreatedEvent>(configuration["KafkaBootstrapServers"], "user-created")); // Register UserService as a singleton service services.AddSingleton<UserService>(); // Register UserCreatedEventHandler as a singleton service services.AddSingleton<UserCreatedEventHandler>(); // Register the API Gateway's event handler services.AddSingleton<ApiGatewayEventHandler>(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } } }
В методе ConfigureServices
регистрируем следующие сервисы:
IUserServiceClient
: одноэлементный экземплярUserServiceClient
, который мы используем для вызова API микросервисаUserService
.KafkaProducer<UserCreatedEvent>
: одноэлементный экземплярKafkaProducer
, который мы используем для публикации сообщений в темеuser-created
в Kafka.UserService
: одноэлементный экземплярUserService
, который мы используем для обработки HTTP-запросов, отправляемых на конечную точку/users
шлюза API.UserCreatedEventHandler
: одноэлементный экземплярUserCreatedEventHandler
, который мы используем для обработки сообщенийUserCreatedEvent
, полученных от Kafka.ApiGatewayEventHandler
: одноэлементный экземплярApiGatewayEventHandler
, который мы используем для обработки событийUserCreated
, полученных отUserCreatedEventHandler
.
В методе Configure
мы добавляем промежуточное ПО Controllers
и сопоставляем конечные точки API с соответствующими контроллерами.
Шаг 6. Добавьте конечные точки API в микрослужбу UserService
В проекте UserService
добавьте новую папку с именем Controllers
и добавьте новый файл с именем UserController.cs
. Добавьте следующий код в файл UserController.cs
:
using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using UserService.Models; using UserService.Services; namespace UserService.Controllers { [ApiController] [Route("users")] public class UserController : ControllerBase { private readonly IUserService userService; private readonly KafkaProducer<UserCreatedEvent> kafkaProducer; public UserController(IUserService userService, KafkaProducer<UserCreatedEvent> kafkaProducer) { this.userService = userService; this.kafkaProducer = kafkaProducer; } [HttpPost] public async Task<IActionResult> CreateUser(CreateUserRequest request) { var user = await userService.CreateUserAsync(request); var userCreatedEvent = new UserCreatedEvent(user.Id, user.Name, user.Email); await kafkaProducer.ProduceAsync(userCreatedEvent); return CreatedAtAction(nameof(GetUser), new { id = user.Id }, user); } [HttpGet("{id}")] public async Task<IActionResult> GetUser(int id) { var user = await userService.GetUserAsync(id); if (user == null) { return NotFound(); } return Ok(user); } } }
В классе UserController
мы определяем две конечные точки HTTP:
- Сообщение `POST /users
: creates a new user and publishes a
UserCreatedEvent` для Kafka. GET /users/{id}
: извлекает существующего пользователя по идентификатору.
В методе CreateUser
мы сначала вызываем метод userService.CreateUserAsync
для создания нового пользователя. Затем мы создаем новый экземпляр UserCreatedEvent
и публикуем его в Kafka с помощью метода kafkaProducer.ProduceAsync
. Наконец, мы возвращаем ответ 201 Created
с данными вновь созданного пользователя.
В методе GetUser
мы вызываем метод userService.GetUserAsync
для получения существующего пользователя по идентификатору. Если пользователь не существует, мы возвращаем ответ 404 Not Found
. В противном случае мы возвращаем ответ 200 OK
с данными пользователя.
Шаг 7. Добавьте обработчики событий в микрослужбу UserService
В проекте UserService
добавьте новую папку с именем EventHandlers
и добавьте новый файл с именем UserCreatedEventHandler.cs
. Добавьте следующий код в файл UserCreatedEventHandler.cs
:
using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; using UserService.Models; using UserService.Services; namespace UserService.EventHandlers { public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent> { private readonly IUserService userService; private readonly ILogger<UserCreatedEventHandler> logger; public UserCreatedEventHandler(IUserService userService, ILogger<UserCreatedEventHandler> logger) { this.userService = userService; this.logger = logger; } public async Task HandleAsync(UserCreatedEvent message, ConsumeResult<string, string> consumeResult) { logger.LogInformation($"Received UserCreatedEvent message: {message}"); // Save the user data to the database var user = new User { Id = message.UserId, Name = message.UserName, Email = message.UserEmail }; await userService.CreateUserAsync(user); logger.LogInformation($"User with ID {message.UserId} created successfully"); } } }
В классе UserCreatedEventHandler
мы реализуем интерфейс IEventHandler<UserCreatedEvent>
, который определяет единственный метод HandleAsync
, который вызывается KafkaConsumer
при получении нового сообщения UserCreatedEvent
от Kafka.
В методе HandleAsync
мы сначала регистрируем полученное сообщение, используя экземпляр ILogger
. Затем мы создаем новый экземпляр User
из данных полученного сообщения и вызываем метод userService.CreateUserAsync
для сохранения пользователя в базе данных. Наконец, мы регистрируем сообщение об успехе с идентификатором пользователя.
Шаг 8. Добавьте обработчики событий в микрослужбу ApiGateway
В проекте ApiGateway
добавьте новую папку с именем EventHandlers
и добавьте новый файл с именем ApiGatewayEventHandler.cs
. Добавьте следующий код в файл ApiGatewayEventHandler.cs
:
using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; using UserService.Events; namespace ApiGateway.EventHandlers { public class ApiGatewayEventHandler : IEventHandler<UserCreated> { private readonly ILogger<ApiGatewayEventHandler> logger; public ApiGatewayEventHandler(ILogger<ApiGatewayEventHandler> logger) { this.logger = logger; } public Task HandleAsync(UserCreated message, ConsumeResult<string, string> consumeResult) { logger.LogInformation($"Received UserCreated event: {message}"); // TODO: Implement custom event handling logic here return Task.CompletedTask; } } }
В классе ApiGatewayEventHandler
мы реализуем интерфейс IEventHandler<UserCreated>
, который определяет одно событие HandleAsyncmethod that is called by the
KafkaConsumerwhen a new
UserCreated, полученное от Kafka.
В методе HandleAsync
мы сначала регистрируем полученное событие, используя экземпляр ILogger
. Затем мы можем реализовать пользовательскую логику обработки событий в соответствии с требованиями нашего приложения.
Шаг 9. Добавьте контрольный журнал в микрослужбу UserService
В проекте UserService
добавьте новую папку с именем AuditTrail
и добавьте новый файл с именем UserAuditTrail.cs
. Добавьте следующий код в файл UserAuditTrail.cs
:
using System.Threading.Tasks; using Microsoft.Extensions.Logging; using UserService.Models; using UserService.Services; namespace UserService.AuditTrail { public class UserAuditTrail : IUserAuditTrail { private readonly IUserService userService; private readonly ILogger<UserAuditTrail> logger; public UserAuditTrail(IUserService userService, ILogger<UserAuditTrail> logger) { this.userService = userService; this.logger = logger; } public async Task LogUserCreated(User user) { logger.LogInformation($"User with ID {user.Id} created"); // TODO: Save the audit trail to a database or other storage mechanism } } }
В классе UserAuditTrail
мы реализуем интерфейс IUserAuditTrail
, который определяет единственный метод LogUserCreated
, который вызывается классом UserService
при создании нового пользователя.
В методе LogUserCreated
мы сначала регистрируем сообщение о том, что пользователь был создан с использованием экземпляра ILogger
. Затем мы можем сохранить контрольный журнал в базе данных или другом механизме хранения в зависимости от требований нашего приложения.
Шаг 10. Обеспечьте согласованность данных с помощью координатора распределенных транзакций
В архитектуре микросервисов поддержание согласованности данных может быть сложной задачей, особенно когда несколько сервисов должны изменять одни и те же данные. Одним из решений этой проблемы является использование координатора распределенных транзакций, такого как Dapper или Entity Framework.
В проекте UserService
мы будем использовать Entity Framework для обеспечения согласованности данных. Во-первых, нам нужно добавить в проект пакет Microsoft.EntityFrameworkCore
NuGet.
Далее мы изменим UserService
, чтобы использовать Entity Framework вместо класса UserRepository
. Добавьте новый файл с именем ApplicationDbContext.cs
в проект UserService
и добавьте следующий код:
using Microsoft.EntityFrameworkCore; using UserService.Models; namespace UserService.Data { public class ApplicationDbContext : DbContext { public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options) { } public DbSet<User> Users { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); modelBuilder.Entity<User>() .HasKey(u => u.Id); } } }
В классе ApplicationDbContext
мы определяем свойство DbSet<User>
и переопределяем метод OnModelCreating
для настройки первичного ключа объекта User
.
Далее мы изменим UserService
для использования Entity Framework. Замените метод CreateUserAsync
в файле UserService.cs
следующим кодом:
public async Task<User> CreateUserAsync(User user) { using var transaction = await dbContext.Database.BeginTransactionAsync(); try { await dbContext.Users.AddAsync(user); await dbContext.SaveChangesAsync(); await auditTrail.LogUserCreated(user); var userCreatedEvent = new UserCreatedEvent { UserId = user.Id, UserName = user.Name, UserEmail = user.Email }; await kafkaProducer.ProduceAsync("user-created", userCreatedEvent); await transaction.CommitAsync(); return user; } catch (Exception ex) { await transaction.RollbackAsync(); logger.LogError(ex, "Error creating user"); throw; } }
В методе CreateUserAsync
мы сначала начинаем новую транзакцию, используя метод dbContext.Database.BeginTransactionAsync
. Затем мы добавляем нового пользователя в таблицу Users
с помощью Entity Framework и сохраняем изменения с помощью метода dbContext.SaveChangesAsync
.
Затем мы регистрируем создание пользователя с помощью службы UserAuditTrail
, создаем событие UserCreated
с помощью производителя Kafka и фиксируем транзакцию с помощью метода transaction.CommitAsync
.
Если во время транзакции возникает исключение, мы откатываем изменения с помощью метода transaction.RollbackAsync
, регистрируем ошибку с помощью экземпляра ILogger
и повторно выбрасываем исключение.
Шаг 11. Запустите микросервисы
Чтобы запустить микросервисы, нам нужно запустить следующие сервисы по порядку:
- Запустите сервер Кафки
- Запустите службу контрольного журнала
- Запустите службу пользователей
- Запустите ApiGateway
Чтобы запустить сервер Kafka, вы можете следовать документации Kafka для своей платформы.
Чтобы запустить службу контрольного журнала, перейдите в каталог проекта AuditTrailService
и выполните следующую команду:
dotnet run
Чтобы запустить UserService, перейдите в каталог проекта UserService
и выполните следующую команду:
dotnet run
Чтобы запустить ApiGateway, перейдите в каталог проекта ApiGateway
и выполните следующую команду:
dotnet run
Шаг 12. Тестирование микросервисов
Чтобы протестировать микросервисы, вы можете использовать такой инструмент, как Postman или curl, для отправки HTTP-запросов на ApiGateway. Ниже приведены примеры запросов, которые вы можете отправить:
- Создайте нового пользователя:
curl -X POST -H "Content-Type: application/json" \ -d '{"name": "John Doe", "email": "[email protected]"}' \ http://localhost:5000/api/users
2. Получить всех пользователей:
curl http://localhost:5000/api/users
3. Получить конкретного пользователя по ID:
curl http://localhost:5000/api/users/1
Заключение
В этом руководстве мы продемонстрировали, как построить архитектуру микросервисов с использованием .Net 7, API-шлюза, Kafka, оркестратора, контрольного журнала и согласованности данных. Мы внедрили микрослужбу UserService, которая позволяет нам создавать и извлекать пользователей, и продемонстрировали, как использовать Kafka для создания и использования событий, журнал аудита для регистрации событий и Entity Framework для обеспечения согласованности данных.