Въведение: Архитектурата на микросервизите се превърна в популярен начин за изграждане на мащабируеми и стабилни приложения. В този урок ще проучим как да изградим архитектура на микроуслуга с помощта на .NET 7 и Kafka. Ще разгледаме и теми като API Gateway, Orchestrator, Audit Trail и Data Consistency. Ще предоставим примерен код за всяка микроуслуга, за да ви помогнем да започнете.
Настройване на инфраструктурата В този раздел ще настроим инфраструктурата, необходима за изграждане на нашата микросервизна архитектура. Ще използваме Docker и Docker Compose, за да настроим Kafka, API Gateway, Orchestrator и микроуслугите. Ние също така ще конфигурираме всяка услуга да комуникира с Kafka.
- Изграждане на микроуслуги В този раздел ще създадем три микроуслуги: потребителска услуга, продуктова услуга и поръчка. Всяка микроуслуга ще има своя собствена база данни и ще комуникира с Kafka, за да изпраща и получава съобщения. Ще предоставим примерен код за всяка микроуслуга.
- Внедряване на API Gateway В този раздел ще внедрим API Gateway с помощта на Ocelot. API Gateway ще получава заявки от клиенти и ще ги насочва към съответната микроуслуга. Ще предоставим примерен код за конфигуриране на API Gateway за работа с микроуслугите.
- Внедряване на оркестратор В този раздел ще внедрим оркестратор с помощта на MassTransit. Оркестраторът ще координира комуникацията между микроуслугите, за да осигури последователност на данните. Ще предоставим примерен код за конфигуриране на Orchestrator за работа с микроуслугите.
- Внедряване на одитна пътека В този раздел ще внедрим одитна пътека с помощта на Serilog. Одитната пътека ще записва всички действия, предприети в микроуслугите за целите на отстраняване на грешки и анализ. Ще предоставим примерен код за конфигуриране на одитната пътека за работа с микроуслугите.
- Осигуряване на съгласуваност на данните В този раздел ще осигурим съгласуваност на данните чрез прилагане на Saga Pattern с помощта на MassTransit. Saga Pattern ще гарантира, че всички микроуслуги, участващи в дадена транзакция, са успешни или отменени, ако една от тях се провали. Ще предоставим примерен код за внедряване на Saga Pattern в нашите микроуслуги.
Настройване на инфраструктурата
Първо, трябва да настроим инфраструктурата, необходима за изграждане на нашата микросервизна архитектура. Ще използваме Docker и Docker Compose, за да настроим Kafka, API Gateway, Orchestrator и микроуслугите. Ние също така ще конфигурираме всяка услуга да комуникира с Kafka.
Стъпка 1: Инсталирайте Docker и Docker Compose
Ако нямате инсталирани Docker и Docker Compose на вашата система, следвайте инструкциите, предоставени в официалната документация, за да ги инсталирате.
Стъпка 2: Създайте файл за съставяне на Docker
Създайте нов файл с име docker-compose.yml
в основната директория на вашия проект и добавете следния код:
version: '3.9' services: zookeeper: image: confluentinc/cp-zookeeper:6.1.1 container_name: zookeeper ports: - "2181:2181" networks: - kafka-network kafka: image: confluentinc/cp-kafka:6.1.1 container_name: kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 networks: - kafka-network networks: kafka-network: driver: bridge
Този docker-compose.yml
файл ще създаде две услуги: ZooKeeper и Kafka. ZooKeeper е услуга за координация, използвана от Kafka, докато Kafka е системата за съобщения, използвана от нашите микроуслуги за комуникация помежду си. Ние също така дефинираме мрежа, наречена kafka-network
, към която принадлежат и двете услуги.
Стъпка 3: Конфигурирайте микроуслугите за комуникация с Kafka
Във всяка микроуслуга трябва да добавим клиентската библиотека на Kafka и да я конфигурираме да комуникира с услугата Kafka, която току-що създадохме. Ще използваме библиотеката Confluent.Kafka, която е .NET клиент за Kafka.
В микроуслугата User
добавете пакета Confluent.Kafka NuGet към проекта. След това създайте нов файл с име KafkaProducer.cs
и добавете следния код:
using System; using System.Threading.Tasks; using Confluent.Kafka; using Newtonsoft.Json; namespace UserService.Services { public class KafkaProducer : IDisposable { private readonly IProducer<string, string> producer; public KafkaProducer(string bootstrapServers) { var config = new ProducerConfig { BootstrapServers = bootstrapServers }; producer = new ProducerBuilder<string, string>(config).Build(); } public async Task ProduceAsync<T>(string topic, T value) { var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = JsonConvert.SerializeObject(value) }; await producer.ProduceAsync(topic, message); } public void Dispose() { producer.Dispose(); } } }
Този KafkaProducer
клас ще отговаря за изпращането на съобщения до Kafka. Създаваме екземпляр на IProducer<string, string>
, който приема ProducerConfig
обект като параметър. В метода ProduceAsync
създаваме нов Message<string, string>
обект и сериализираме параметъра value
във формат JSON, преди да го зададем като стойност на съобщението. След това извикваме метода ProduceAsync
на екземпляра producer
, за да изпратим съобщението до Kafka.
След това създайте нов файл с име KafkaConsumer.cs
и добавете следния код:
using System; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Newtonsoft.Json; namespace UserService.Services { public abstract class KafkaConsumer<T> : BackgroundService { private readonly ILogger logger; private readonly IConsumer<string, string> consumer; public KafkaConsumer(string bootstrapServers, string groupId, string topic, ILogger logger) { this.logger = logger; var config = new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupId, AutoOffsetReset = AutoOffsetReset.Earliest }; consumer = new ConsumerBuilder<string, string>(config).Build(); consumer.Subscribe(topic); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { var result = consumer.Consume(stoppingToken); if (result != null && !result.IsPartitionEOF) { var value = JsonConvert.DeserializeObject<T>(result.Value); await ConsumeAsync(value, stoppingToken); } } catch (OperationCanceledException) { logger.LogInformation("Kafka consumer has been cancelled."); break; } catch (Exception ex) { logger.LogError(ex, "An error occurred while consuming messages from Kafka."); } } consumer.Close(); } public override void Dispose() { consumer.Dispose(); base.Dispose(); } protected abstract Task ConsumeAsync(T value, CancellationToken stoppingToken); } }
Този KafkaConsumer
клас е абстрактен клас, който можете да наследите от вашите микроуслуги, за да консумирате съобщения от Kafka. Създаваме екземпляр на IConsumer<string, string>
, който приема обект ConsumerConfig
като параметър. В метода ExecuteAsync
стартираме безкраен цикъл, за да консумираме съобщения от Kafka, използвайки метода Consume
на екземпляра consumer
. Ние десериализираме стойността на съобщението до общия тип T
и извикваме метода ConsumeAsync
, който трябва да внедрите във вашата микроуслуга, за да се справите с консумираното съобщение.
Стъпка 4: Добавете API Gateway и Orchestrator услуги
Добавете нов проект към вашето решение с име ApiGateway
и друг проект с име Orchestrator
. В проекта ApiGateway
добавете следния код към файла Program.cs
:
using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; namespace ApiGateway { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); } }
В проекта Orchestrator
добавете следния код към файла Program.cs
:
using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; namespace Orchestrator { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); } }
Стъпка 5: Добавете крайни точки на API към API Gateway
В проекта ApiGateway
добавете нов файл с име Startup.cs
и добавете следния код:
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using UserService.Services; namespace ApiGateway { public class Startup { private readonly IConfiguration configuration; public Startup(IConfiguration configuration) { this.configuration = configuration; } public void ConfigureServices(IServiceCollection services) { services.AddControllers(); // Register UserServiceClient as a singleton service services.AddSingleton<IUserServiceClient>(new UserServiceClient(configuration["UserServiceUrl"])); // Register KafkaProducer as a singleton service services.AddSingleton<KafkaProducer<UserCreatedEvent>>(new KafkaProducer<UserCreatedEvent>(configuration["KafkaBootstrapServers"], "user-created")); // Register UserService as a singleton service services.AddSingleton<UserService>(); // Register UserCreatedEventHandler as a singleton service services.AddSingleton<UserCreatedEventHandler>(); // Register the API Gateway's event handler services.AddSingleton<ApiGatewayEventHandler>(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } } }
В метода ConfigureServices
регистрираме следните услуги:
IUserServiceClient
: единичен екземпляр наUserServiceClient
, който използваме за извикване на API на микроуслугатаUserService
.KafkaProducer<UserCreatedEvent>
: еднократно копие наKafkaProducer
, което използваме за публикуване на съобщения в тематаuser-created
в Kafka.UserService
: единичен екземпляр наUserService
, който използваме за обработка на HTTP заявките, изпратени до крайната точка/users
на API Gateway.UserCreatedEventHandler
: единичен екземпляр наUserCreatedEventHandler
, който използваме за обработка наUserCreatedEvent
съобщенията, получени от Kafka.ApiGatewayEventHandler
: единичен екземпляр наApiGatewayEventHandler
, който използваме за обработка наUserCreated
събитията, получени отUserCreatedEventHandler
.
В метода Configure
добавяме междинния софтуер Controllers
и съпоставяме крайните точки на API към съответните им контролери.
Стъпка 6: Добавете крайни точки на API към микроуслугата UserService
В проекта UserService
добавете нова папка с име Controllers
и добавете нов файл с име UserController.cs
. Добавете следния код към файла UserController.cs
:
using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using UserService.Models; using UserService.Services; namespace UserService.Controllers { [ApiController] [Route("users")] public class UserController : ControllerBase { private readonly IUserService userService; private readonly KafkaProducer<UserCreatedEvent> kafkaProducer; public UserController(IUserService userService, KafkaProducer<UserCreatedEvent> kafkaProducer) { this.userService = userService; this.kafkaProducer = kafkaProducer; } [HttpPost] public async Task<IActionResult> CreateUser(CreateUserRequest request) { var user = await userService.CreateUserAsync(request); var userCreatedEvent = new UserCreatedEvent(user.Id, user.Name, user.Email); await kafkaProducer.ProduceAsync(userCreatedEvent); return CreatedAtAction(nameof(GetUser), new { id = user.Id }, user); } [HttpGet("{id}")] public async Task<IActionResult> GetUser(int id) { var user = await userService.GetUserAsync(id); if (user == null) { return NotFound(); } return Ok(user); } } }
В класа UserController
дефинираме две крайни точки на HTTP:
- `ПОСТ /users
: creates a new user and publishes a
UserCreatedEvent` съобщение до Kafka. GET /users/{id}
: извлича съществуващ потребител по ID.
В метода CreateUser
първо извикваме метода userService.CreateUserAsync
, за да създадем нов потребител. След това създаваме нов екземпляр UserCreatedEvent
и го публикуваме в Kafka с помощта на метода kafkaProducer.ProduceAsync
. Накрая връщаме отговор 201 Created
с данните на новосъздадения потребител.
В метода GetUser
извикваме метода userService.GetUserAsync
, за да извлечем съществуващ потребител по ID. Ако потребителят не съществува, връщаме отговор 404 Not Found
. В противен случай връщаме отговор 200 OK
с данните на потребителя.
Стъпка 7: Добавете манипулатори на събития към микроуслугата UserService
В проекта UserService
добавете нова папка с име EventHandlers
и добавете нов файл с име UserCreatedEventHandler.cs
. Добавете следния код към файла UserCreatedEventHandler.cs
:
using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; using UserService.Models; using UserService.Services; namespace UserService.EventHandlers { public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent> { private readonly IUserService userService; private readonly ILogger<UserCreatedEventHandler> logger; public UserCreatedEventHandler(IUserService userService, ILogger<UserCreatedEventHandler> logger) { this.userService = userService; this.logger = logger; } public async Task HandleAsync(UserCreatedEvent message, ConsumeResult<string, string> consumeResult) { logger.LogInformation($"Received UserCreatedEvent message: {message}"); // Save the user data to the database var user = new User { Id = message.UserId, Name = message.UserName, Email = message.UserEmail }; await userService.CreateUserAsync(user); logger.LogInformation($"User with ID {message.UserId} created successfully"); } } }
В класа UserCreatedEventHandler
ние внедряваме интерфейса IEventHandler<UserCreatedEvent>
, който дефинира единичен HandleAsync
метод, който се извиква от KafkaConsumer
, когато се получи ново UserCreatedEvent
съобщение от Kafka.
В метода HandleAsync
първо регистрираме полученото съобщение, използвайки екземпляра ILogger
. След това създаваме нов екземпляр User
от данните на полученото съобщение и извикваме метода userService.CreateUserAsync
, за да запазим потребителя в базата данни. Накрая регистрираме съобщение за успех с идентификатора на потребителя.
Стъпка 8: Добавете манипулатори на събития към микроуслугата ApiGateway
В проекта ApiGateway
добавете нова папка с име EventHandlers
и добавете нов файл с име ApiGatewayEventHandler.cs
. Добавете следния код към файла ApiGatewayEventHandler.cs
:
using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Logging; using UserService.Events; namespace ApiGateway.EventHandlers { public class ApiGatewayEventHandler : IEventHandler<UserCreated> { private readonly ILogger<ApiGatewayEventHandler> logger; public ApiGatewayEventHandler(ILogger<ApiGatewayEventHandler> logger) { this.logger = logger; } public Task HandleAsync(UserCreated message, ConsumeResult<string, string> consumeResult) { logger.LogInformation($"Received UserCreated event: {message}"); // TODO: Implement custom event handling logic here return Task.CompletedTask; } } }
В класа ApiGatewayEventHandler
ние внедряваме интерфейса IEventHandler<UserCreated>
, който дефинира едно събитие `HandleAsyncmethod that is called by the
KafkaConsumerwhen a new
UserCreated`, получено от Kafka.
В метода HandleAsync
първо регистрираме полученото събитие, използвайки екземпляра ILogger
. След това можем да внедрим персонализирана логика за обработка на събития тук въз основа на изискванията на нашето приложение.
Стъпка 9: Добавете одитна пътека към микроуслугата UserService
В проекта UserService
добавете нова папка с име AuditTrail
и добавете нов файл с име UserAuditTrail.cs
. Добавете следния код към файла UserAuditTrail.cs
:
using System.Threading.Tasks; using Microsoft.Extensions.Logging; using UserService.Models; using UserService.Services; namespace UserService.AuditTrail { public class UserAuditTrail : IUserAuditTrail { private readonly IUserService userService; private readonly ILogger<UserAuditTrail> logger; public UserAuditTrail(IUserService userService, ILogger<UserAuditTrail> logger) { this.userService = userService; this.logger = logger; } public async Task LogUserCreated(User user) { logger.LogInformation($"User with ID {user.Id} created"); // TODO: Save the audit trail to a database or other storage mechanism } } }
В класа UserAuditTrail
ние внедряваме интерфейса IUserAuditTrail
, който дефинира единичен LogUserCreated
метод, който се извиква от UserService
, когато се създава нов потребител.
В метода LogUserCreated
първо регистрираме съобщение, което показва, че потребител е създаден с помощта на екземпляра ILogger
. След това можем да запазим одитната пътека в база данни или друг механизъм за съхранение въз основа на изискванията на нашето приложение.
Стъпка 10: Осигурете последователност на данните с помощта на координатор на разпределени транзакции
В архитектура на микроуслуги поддържането на съгласуваност на данните може да бъде предизвикателство, особено когато множество услуги трябва да променят едни и същи данни. Едно решение на този проблем е да се използва координатор на разпределени транзакции, като Dapper или Entity Framework.
В проекта UserService
ще използваме Entity Framework, за да осигурим съгласуваност на данните. Първо, трябва да добавим пакета Microsoft.EntityFrameworkCore
NuGet към проекта.
След това ще модифицираме UserService
, за да използваме Entity Framework вместо класа UserRepository
. Добавете нов файл с име ApplicationDbContext.cs
към проекта UserService
и добавете следния код:
using Microsoft.EntityFrameworkCore; using UserService.Models; namespace UserService.Data { public class ApplicationDbContext : DbContext { public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options) { } public DbSet<User> Users { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); modelBuilder.Entity<User>() .HasKey(u => u.Id); } } }
В класа ApplicationDbContext
ние дефинираме свойство DbSet<User>
и заместваме метода OnModelCreating
, за да конфигурираме първичния ключ на обекта User
.
След това ще модифицираме UserService
, за да използваме Entity Framework. Заменете метода CreateUserAsync
във файла UserService.cs
със следния код:
public async Task<User> CreateUserAsync(User user) { using var transaction = await dbContext.Database.BeginTransactionAsync(); try { await dbContext.Users.AddAsync(user); await dbContext.SaveChangesAsync(); await auditTrail.LogUserCreated(user); var userCreatedEvent = new UserCreatedEvent { UserId = user.Id, UserName = user.Name, UserEmail = user.Email }; await kafkaProducer.ProduceAsync("user-created", userCreatedEvent); await transaction.CommitAsync(); return user; } catch (Exception ex) { await transaction.RollbackAsync(); logger.LogError(ex, "Error creating user"); throw; } }
В метода CreateUserAsync
първо започваме нова транзакция, използвайки метода dbContext.Database.BeginTransactionAsync
. След това добавяме новия потребител към таблицата Users
с помощта на Entity Framework и запазваме промените с помощта на метода dbContext.SaveChangesAsync
.
След това регистрираме създаването на потребител с помощта на услугата UserAuditTrail
, създаваме събитие UserCreated
с помощта на производителя на Kafka и извършваме транзакцията с помощта на метода transaction.CommitAsync
.
Ако възникне изключение по време на транзакцията, ние връщаме назад промените с помощта на метода transaction.RollbackAsync
, регистрираме грешката с помощта на екземпляра ILogger
и хвърляме отново изключението.
Стъпка 11: Стартирайте микроуслугите
За да стартираме микроуслугите, трябва да стартираме следните услуги по ред:
- Стартирайте Kafka сървъра
- Стартирайте услугата за одитна пътека
- Стартирайте UserService
- Стартирайте ApiGateway
За да стартирате Kafka сървъра, можете да следвате документацията на Kafka за вашата платформа.
За да стартирате услугата за одитна пътека, отидете до директорията на проекта AuditTrailService
и изпълнете следната команда:
dotnet run
За да стартирате UserService, отидете до директорията на проекта UserService
и изпълнете следната команда:
dotnet run
За да стартирате ApiGateway, отидете до директорията на проекта ApiGateway
и изпълнете следната команда:
dotnet run
Стъпка 12: Тествайте микроуслугите
За да тествате микроуслугите, можете да използвате инструмент като Postman или curl, за да изпращате HTTP заявки към ApiGateway. Следват някои примерни заявки, които можете да изпратите:
- Създайте нов потребител:
curl -X POST -H "Content-Type: application/json" \ -d '{"name": "John Doe", "email": "[email protected]"}' \ http://localhost:5000/api/users
2. Вземете всички потребители:
curl http://localhost:5000/api/users
3. Вземете конкретен потребител по ID:
curl http://localhost:5000/api/users/1
Заключение
В този урок демонстрирахме как да изградим архитектура на микроуслуги с помощта на .Net 7, api-gateway, Kafka, оркестратор, одитна пътека и последователност на данните. Внедрихме микроуслугата UserService, която ни позволява да създаваме и извличаме потребители, и демонстрирахме как да използваме Kafka за създаване и консумиране на събития, одитна пътека за регистриране на събития и Entity Framework, за да гарантираме съгласуваност на данните.