Въведение: Архитектурата на микросервизите се превърна в популярен начин за изграждане на мащабируеми и стабилни приложения. В този урок ще проучим как да изградим архитектура на микроуслуга с помощта на .NET 7 и Kafka. Ще разгледаме и теми като API Gateway, Orchestrator, Audit Trail и Data Consistency. Ще предоставим примерен код за всяка микроуслуга, за да ви помогнем да започнете.

Настройване на инфраструктурата В този раздел ще настроим инфраструктурата, необходима за изграждане на нашата микросервизна архитектура. Ще използваме Docker и Docker Compose, за да настроим Kafka, API Gateway, Orchestrator и микроуслугите. Ние също така ще конфигурираме всяка услуга да комуникира с Kafka.

  • Изграждане на микроуслуги В този раздел ще създадем три микроуслуги: потребителска услуга, продуктова услуга и поръчка. Всяка микроуслуга ще има своя собствена база данни и ще комуникира с Kafka, за да изпраща и получава съобщения. Ще предоставим примерен код за всяка микроуслуга.
  • Внедряване на API Gateway В този раздел ще внедрим API Gateway с помощта на Ocelot. API Gateway ще получава заявки от клиенти и ще ги насочва към съответната микроуслуга. Ще предоставим примерен код за конфигуриране на API Gateway за работа с микроуслугите.
  • Внедряване на оркестратор В този раздел ще внедрим оркестратор с помощта на MassTransit. Оркестраторът ще координира комуникацията между микроуслугите, за да осигури последователност на данните. Ще предоставим примерен код за конфигуриране на Orchestrator за работа с микроуслугите.
  • Внедряване на одитна пътека В този раздел ще внедрим одитна пътека с помощта на Serilog. Одитната пътека ще записва всички действия, предприети в микроуслугите за целите на отстраняване на грешки и анализ. Ще предоставим примерен код за конфигуриране на одитната пътека за работа с микроуслугите.
  • Осигуряване на съгласуваност на данните В този раздел ще осигурим съгласуваност на данните чрез прилагане на Saga Pattern с помощта на MassTransit. Saga Pattern ще гарантира, че всички микроуслуги, участващи в дадена транзакция, са успешни или отменени, ако една от тях се провали. Ще предоставим примерен код за внедряване на Saga Pattern в нашите микроуслуги.

Настройване на инфраструктурата

Първо, трябва да настроим инфраструктурата, необходима за изграждане на нашата микросервизна архитектура. Ще използваме Docker и Docker Compose, за да настроим Kafka, API Gateway, Orchestrator и микроуслугите. Ние също така ще конфигурираме всяка услуга да комуникира с Kafka.

Стъпка 1: Инсталирайте Docker и Docker Compose

Ако нямате инсталирани Docker и Docker Compose на вашата система, следвайте инструкциите, предоставени в официалната документация, за да ги инсталирате.

Стъпка 2: Създайте файл за съставяне на Docker

Създайте нов файл с име docker-compose.yml в основната директория на вашия проект и добавете следния код:

version: '3.9'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network
  kafka:
    image: confluentinc/cp-kafka:6.1.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      - kafka-network
networks:
  kafka-network:
    driver: bridge

Този docker-compose.yml файл ще създаде две услуги: ZooKeeper и Kafka. ZooKeeper е услуга за координация, използвана от Kafka, докато Kafka е системата за съобщения, използвана от нашите микроуслуги за комуникация помежду си. Ние също така дефинираме мрежа, наречена kafka-network, към която принадлежат и двете услуги.

Стъпка 3: Конфигурирайте микроуслугите за комуникация с Kafka

Във всяка микроуслуга трябва да добавим клиентската библиотека на Kafka и да я конфигурираме да комуникира с услугата Kafka, която току-що създадохме. Ще използваме библиотеката Confluent.Kafka, която е .NET клиент за Kafka.

В микроуслугата User добавете пакета Confluent.Kafka NuGet към проекта. След това създайте нов файл с име KafkaProducer.cs и добавете следния код:

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Newtonsoft.Json;

namespace UserService.Services
{
    public class KafkaProducer : IDisposable
    {
        private readonly IProducer<string, string> producer;
        public KafkaProducer(string bootstrapServers)
        {
            var config = new ProducerConfig { BootstrapServers = bootstrapServers };
            producer = new ProducerBuilder<string, string>(config).Build();
        }
        public async Task ProduceAsync<T>(string topic, T value)
        {
            var message = new Message<string, string>
            {
                Key = Guid.NewGuid().ToString(),
                Value = JsonConvert.SerializeObject(value)
            };
            await producer.ProduceAsync(topic, message);
        }
        public void Dispose()
        {
            producer.Dispose();
        }
    }
}

Този KafkaProducer клас ще отговаря за изпращането на съобщения до Kafka. Създаваме екземпляр на IProducer<string, string>, който приема ProducerConfig обект като параметър. В метода ProduceAsync създаваме нов Message<string, string> обект и сериализираме параметъра value във формат JSON, преди да го зададем като стойност на съобщението. След това извикваме метода ProduceAsync на екземпляра producer, за да изпратим съобщението до Kafka.

След това създайте нов файл с име KafkaConsumer.cs и добавете следния код:

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace UserService.Services
{
    public abstract class KafkaConsumer<T> : BackgroundService
    {
        private readonly ILogger logger;
        private readonly IConsumer<string, string> consumer;
        public KafkaConsumer(string bootstrapServers, string groupId, string topic, ILogger logger)
        {
            this.logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = groupId,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe(topic);
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var result = consumer.Consume(stoppingToken);
                    if (result != null && !result.IsPartitionEOF)
                    {
                        var value = JsonConvert.DeserializeObject<T>(result.Value);
                        await ConsumeAsync(value, stoppingToken);
                    }
                }
                catch (OperationCanceledException)
                {
                    logger.LogInformation("Kafka consumer has been cancelled.");
                    break;
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "An error occurred while consuming messages from Kafka.");
                }
            }
            consumer.Close();
        }
        public override void Dispose()
        {
            consumer.Dispose();
            base.Dispose();
        }
        protected abstract Task ConsumeAsync(T value, CancellationToken stoppingToken);
    }
}

Този KafkaConsumer клас е абстрактен клас, който можете да наследите от вашите микроуслуги, за да консумирате съобщения от Kafka. Създаваме екземпляр на IConsumer<string, string>, който приема обект ConsumerConfig като параметър. В метода ExecuteAsync стартираме безкраен цикъл, за да консумираме съобщения от Kafka, използвайки метода Consume на екземпляра consumer. Ние десериализираме стойността на съобщението до общия тип T и извикваме метода ConsumeAsync, който трябва да внедрите във вашата микроуслуга, за да се справите с консумираното съобщение.

Стъпка 4: Добавете API Gateway и Orchestrator услуги

Добавете нов проект към вашето решение с име ApiGateway и друг проект с име Orchestrator. В проекта ApiGateway добавете следния код към файла Program.cs:

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace ApiGateway
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

В проекта Orchestrator добавете следния код към файла Program.cs:

using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Orchestrator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }
}

Стъпка 5: Добавете крайни точки на API към API Gateway

В проекта ApiGateway добавете нов файл с име Startup.cs и добавете следния код:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using UserService.Services;

namespace ApiGateway
{
    public class Startup
    {
        private readonly IConfiguration configuration;
        public Startup(IConfiguration configuration)
        {
            this.configuration = configuration;
        }
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            // Register UserServiceClient as a singleton service
            services.AddSingleton<IUserServiceClient>(new UserServiceClient(configuration["UserServiceUrl"]));
            // Register KafkaProducer as a singleton service
            services.AddSingleton<KafkaProducer<UserCreatedEvent>>(new KafkaProducer<UserCreatedEvent>(configuration["KafkaBootstrapServers"], "user-created"));
            // Register UserService as a singleton service
            services.AddSingleton<UserService>();
            // Register UserCreatedEventHandler as a singleton service
            services.AddSingleton<UserCreatedEventHandler>();
            // Register the API Gateway's event handler
            services.AddSingleton<ApiGatewayEventHandler>();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}

В метода ConfigureServices регистрираме следните услуги:

  • IUserServiceClient: единичен екземпляр на UserServiceClient, който използваме за извикване на API на микроуслугата UserService.
  • KafkaProducer<UserCreatedEvent>: еднократно копие на KafkaProducer, което използваме за публикуване на съобщения в темата user-created в Kafka.
  • UserService: единичен екземпляр на UserService, който използваме за обработка на HTTP заявките, изпратени до крайната точка /users на API Gateway.
  • UserCreatedEventHandler: единичен екземпляр на UserCreatedEventHandler, който използваме за обработка на UserCreatedEvent съобщенията, получени от Kafka.
  • ApiGatewayEventHandler: единичен екземпляр на ApiGatewayEventHandler, който използваме за обработка на UserCreated събитията, получени от UserCreatedEventHandler.

В метода Configure добавяме междинния софтуер Controllers и съпоставяме крайните точки на API към съответните им контролери.

Стъпка 6: Добавете крайни точки на API към микроуслугата UserService

В проекта UserService добавете нова папка с име Controllers и добавете нов файл с име UserController.cs. Добавете следния код към файла UserController.cs:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using UserService.Models;
using UserService.Services;

namespace UserService.Controllers
{
    [ApiController]
    [Route("users")]
    public class UserController : ControllerBase
    {
        private readonly IUserService userService;
        private readonly KafkaProducer<UserCreatedEvent> kafkaProducer;
        public UserController(IUserService userService, KafkaProducer<UserCreatedEvent> kafkaProducer)
        {
            this.userService = userService;
            this.kafkaProducer = kafkaProducer;
        }
        [HttpPost]
        public async Task<IActionResult> CreateUser(CreateUserRequest request)
        {
            var user = await userService.CreateUserAsync(request);
            var userCreatedEvent = new UserCreatedEvent(user.Id, user.Name, user.Email);
            await kafkaProducer.ProduceAsync(userCreatedEvent);
            return CreatedAtAction(nameof(GetUser), new { id = user.Id }, user);
        }
        [HttpGet("{id}")]
        public async Task<IActionResult> GetUser(int id)
        {
            var user = await userService.GetUserAsync(id);
            if (user == null)
            {
                return NotFound();
            }
            return Ok(user);
        }
    }
}

В класа UserController дефинираме две крайни точки на HTTP:

  • `ПОСТ /users: creates a new user and publishes a UserCreatedEvent` съобщение до Kafka.
  • GET /users/{id}: извлича съществуващ потребител по ID.

В метода CreateUser първо извикваме метода userService.CreateUserAsync, за да създадем нов потребител. След това създаваме нов екземпляр UserCreatedEvent и го публикуваме в Kafka с помощта на метода kafkaProducer.ProduceAsync. Накрая връщаме отговор 201 Created с данните на новосъздадения потребител.

В метода GetUser извикваме метода userService.GetUserAsync, за да извлечем съществуващ потребител по ID. Ако потребителят не съществува, връщаме отговор 404 Not Found. В противен случай връщаме отговор 200 OK с данните на потребителя.

Стъпка 7: Добавете манипулатори на събития към микроуслугата UserService

В проекта UserService добавете нова папка с име EventHandlers и добавете нов файл с име UserCreatedEventHandler.cs. Добавете следния код към файла UserCreatedEventHandler.cs:

using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using UserService.Models;
using UserService.Services;

namespace UserService.EventHandlers
{
    public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
    {
        private readonly IUserService userService;
        private readonly ILogger<UserCreatedEventHandler> logger;
        public UserCreatedEventHandler(IUserService userService, ILogger<UserCreatedEventHandler> logger)
        {
            this.userService = userService;
            this.logger = logger;
        }
        public async Task HandleAsync(UserCreatedEvent message, ConsumeResult<string, string> consumeResult)
        {
            logger.LogInformation($"Received UserCreatedEvent message: {message}");
            // Save the user data to the database
            var user = new User
            {
                Id = message.UserId,
                Name = message.UserName,
                Email = message.UserEmail
            };
            await userService.CreateUserAsync(user);
            logger.LogInformation($"User with ID {message.UserId} created successfully");
        }
    }
}

В класа UserCreatedEventHandler ние внедряваме интерфейса IEventHandler<UserCreatedEvent>, който дефинира единичен HandleAsync метод, който се извиква от KafkaConsumer, когато се получи ново UserCreatedEvent съобщение от Kafka.

В метода HandleAsync първо регистрираме полученото съобщение, използвайки екземпляра ILogger. След това създаваме нов екземпляр User от данните на полученото съобщение и извикваме метода userService.CreateUserAsync, за да запазим потребителя в базата данни. Накрая регистрираме съобщение за успех с идентификатора на потребителя.

Стъпка 8: Добавете манипулатори на събития към микроуслугата ApiGateway

В проекта ApiGateway добавете нова папка с име EventHandlers и добавете нов файл с име ApiGatewayEventHandler.cs. Добавете следния код към файла ApiGatewayEventHandler.cs:

using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using UserService.Events;

namespace ApiGateway.EventHandlers
{
    public class ApiGatewayEventHandler : IEventHandler<UserCreated>
    {
        private readonly ILogger<ApiGatewayEventHandler> logger;
        public ApiGatewayEventHandler(ILogger<ApiGatewayEventHandler> logger)
        {
            this.logger = logger;
        }
        public Task HandleAsync(UserCreated message, ConsumeResult<string, string> consumeResult)
        {
            logger.LogInformation($"Received UserCreated event: {message}");
            // TODO: Implement custom event handling logic here
            return Task.CompletedTask;
        }
    }
}

В класа ApiGatewayEventHandler ние внедряваме интерфейса IEventHandler<UserCreated>, който дефинира едно събитие `HandleAsyncmethod that is called by theKafkaConsumerwhen a newUserCreated`, получено от Kafka.

В метода HandleAsync първо регистрираме полученото събитие, използвайки екземпляра ILogger. След това можем да внедрим персонализирана логика за обработка на събития тук въз основа на изискванията на нашето приложение.

Стъпка 9: Добавете одитна пътека към микроуслугата UserService

В проекта UserService добавете нова папка с име AuditTrail и добавете нов файл с име UserAuditTrail.cs. Добавете следния код към файла UserAuditTrail.cs:

using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using UserService.Models;
using UserService.Services;

namespace UserService.AuditTrail
{
    public class UserAuditTrail : IUserAuditTrail
    {
        private readonly IUserService userService;
        private readonly ILogger<UserAuditTrail> logger;
        public UserAuditTrail(IUserService userService, ILogger<UserAuditTrail> logger)
        {
            this.userService = userService;
            this.logger = logger;
        }
        public async Task LogUserCreated(User user)
        {
            logger.LogInformation($"User with ID {user.Id} created");
            // TODO: Save the audit trail to a database or other storage mechanism
        }
    }
}

В класа UserAuditTrail ние внедряваме интерфейса IUserAuditTrail, който дефинира единичен LogUserCreated метод, който се извиква от UserService, когато се създава нов потребител.

В метода LogUserCreated първо регистрираме съобщение, което показва, че потребител е създаден с помощта на екземпляра ILogger. След това можем да запазим одитната пътека в база данни или друг механизъм за съхранение въз основа на изискванията на нашето приложение.

Стъпка 10: Осигурете последователност на данните с помощта на координатор на разпределени транзакции

В архитектура на микроуслуги поддържането на съгласуваност на данните може да бъде предизвикателство, особено когато множество услуги трябва да променят едни и същи данни. Едно решение на този проблем е да се използва координатор на разпределени транзакции, като Dapper или Entity Framework.

В проекта UserService ще използваме Entity Framework, за да осигурим съгласуваност на данните. Първо, трябва да добавим пакета Microsoft.EntityFrameworkCore NuGet към проекта.

След това ще модифицираме UserService, за да използваме Entity Framework вместо класа UserRepository. Добавете нов файл с име ApplicationDbContext.cs към проекта UserService и добавете следния код:

using Microsoft.EntityFrameworkCore;
using UserService.Models;

namespace UserService.Data
{
    public class ApplicationDbContext : DbContext
    {
        public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
            : base(options)
        {
        }
        public DbSet<User> Users { get; set; }
        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            modelBuilder.Entity<User>()
                .HasKey(u => u.Id);
        }
    }
}

В класа ApplicationDbContext ние дефинираме свойство DbSet<User> и заместваме метода OnModelCreating, за да конфигурираме първичния ключ на обекта User.

След това ще модифицираме UserService, за да използваме Entity Framework. Заменете метода CreateUserAsync във файла UserService.cs със следния код:

public async Task<User> CreateUserAsync(User user)
{
    using var transaction = await dbContext.Database.BeginTransactionAsync();

    try
    {
        await dbContext.Users.AddAsync(user);
        await dbContext.SaveChangesAsync();

        await auditTrail.LogUserCreated(user);

        var userCreatedEvent = new UserCreatedEvent
        {
            UserId = user.Id,
            UserName = user.Name,
            UserEmail = user.Email
        };
        await kafkaProducer.ProduceAsync("user-created", userCreatedEvent);

        await transaction.CommitAsync();

        return user;
    }
    catch (Exception ex)
    {
        await transaction.RollbackAsync();
        logger.LogError(ex, "Error creating user");
        throw;
    }
}

В метода CreateUserAsync първо започваме нова транзакция, използвайки метода dbContext.Database.BeginTransactionAsync. След това добавяме новия потребител към таблицата Users с помощта на Entity Framework и запазваме промените с помощта на метода dbContext.SaveChangesAsync.

След това регистрираме създаването на потребител с помощта на услугата UserAuditTrail, създаваме събитие UserCreated с помощта на производителя на Kafka и извършваме транзакцията с помощта на метода transaction.CommitAsync.

Ако възникне изключение по време на транзакцията, ние връщаме назад промените с помощта на метода transaction.RollbackAsync, регистрираме грешката с помощта на екземпляра ILogger и хвърляме отново изключението.

Стъпка 11: Стартирайте микроуслугите

За да стартираме микроуслугите, трябва да стартираме следните услуги по ред:

  1. Стартирайте Kafka сървъра
  2. Стартирайте услугата за одитна пътека
  3. Стартирайте UserService
  4. Стартирайте ApiGateway

За да стартирате Kafka сървъра, можете да следвате документацията на Kafka за вашата платформа.

За да стартирате услугата за одитна пътека, отидете до директорията на проекта AuditTrailService и изпълнете следната команда:

dotnet run

За да стартирате UserService, отидете до директорията на проекта UserService и изпълнете следната команда:

dotnet run

За да стартирате ApiGateway, отидете до директорията на проекта ApiGateway и изпълнете следната команда:

dotnet run

Стъпка 12: Тествайте микроуслугите

За да тествате микроуслугите, можете да използвате инструмент като Postman или curl, за да изпращате HTTP заявки към ApiGateway. Следват някои примерни заявки, които можете да изпратите:

  1. Създайте нов потребител:
curl -X POST -H "Content-Type: application/json" \
    -d '{"name": "John Doe", "email": "[email protected]"}' \
    http://localhost:5000/api/users

2. Вземете всички потребители:

curl http://localhost:5000/api/users

3. Вземете конкретен потребител по ID:

curl http://localhost:5000/api/users/1

Заключение

В този урок демонстрирахме как да изградим архитектура на микроуслуги с помощта на .Net 7, api-gateway, Kafka, оркестратор, одитна пътека и последователност на данните. Внедрихме микроуслугата UserService, която ни позволява да създаваме и извличаме потребители, и демонстрирахме как да използваме Kafka за създаване и консумиране на събития, одитна пътека за регистриране на събития и Entity Framework, за да гарантираме съгласуваност на данните.