Добавление потребителей без добавления MassTransit в контейнер внедрения зависимостей

Я пытаюсь реализовать MassTransit, не зависящую от типа транспорта (например, служебную шину Azure, RabbitMQ и т. Д.). Я отделил логику от контейнера DI, поскольку контейнер DI добавляет область действия интерфейса к конкретному классу и имеет фабрику для определения транспорта на основе конфигурации. Это работает для публикации и отправки, но когда я пытаюсь выполнить запрос / ответ или использование, создаются обмены / очереди для RabbitMq, а также темы и очередь для служебной шины Azure, но сообщение не получено.

Я думаю, что затем я могу взять библиотеку классов, которая выполняет всю логику и переносить из проекта в проект, не имея большого фрагмента всех потребителей, поскольку они могут отличаться от проекта к проекту.

Я чувствую, что упускаю что-то важное. Это не будет работать без выполнения .AddConsumer в контейнере DI после использования AddMassTransit для добавления конфигурации шины в контейнер DI, что является противоположностью тому, что я пытаюсь.

Я создал тестовый проект интеграции, чтобы попытаться использовать TDD, при этом отлаживая эту функциональность перед добавлением в службу, и здесь меня заблокировали. Переменная ответа никогда не возвращается, и в конце концов я получаю тайм-аут. Проект интеграции зависит только от библиотеки классов MassTransit и библиотеки классов конфигурации и фактически обращается непосредственно к транспорту (в отличие от имитации над ним).

Я близок или мне нужно отказаться от этого квеста?

По коду из моего текущего проекта:

Startup.cs

        Configuration.IConfigurationProvider cfg = new ServiceFabricConfigurationProvider();
        switch (cfg.MessageService.MessagingService)
        {
            case "AzureServiceBus":
                services.AddScoped<IMassTransitTransport, MassTransitAzureServiceBusTransport>();
                break;
            case "RabbitMq":
                services.AddScoped<IMassTransitTransport, MassTransitRabbitMqTransport>();
                break;
            default:
                throw new ArgumentException("Invalid message service");
        };

        services.AddScoped<IMessagingService, MassTransitMessagingService>();

Транспортные классы очень похожи, поскольку имеют общий интерфейс - оба показаны ниже.

IMassTransitTransport

public interface IMassTransitTransport
{
    IBusControl BusControl { get; }
}

MassTransitAzureServiceBusTransport

public sealed class MassTransitAzureServiceBusTransport : IMassTransitTransport
{
    readonly IConfigurationProvider configProvider;

    public MassTransitAzureServiceBusTransport(IConfigurationProvider configProvider)
    {
        this.configProvider = configProvider;
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    public IBusControl BusControl { get; }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingAzureServiceBus(cfg => 
        {
            cfg.Host(configProvider.AzureServiceBus.AzureServiceBusConnectionString);

            cfg.ReceiveEndpoint("MyQueue", e => 
            {
                e.Consumer<ConsumerClass>();
            });
        });
    }

MassTransitRabbitMqTransport

public sealed class MassTransitRabbitMqTransport : IMassTransitTransport
{
    readonly IConfigurationProvider configProvider;

    public MassTransitRabbitMqTransport(IConfigurationProvider configProvider)
    {
        this.configProvider = configProvider;
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    public IBusControl BusControl { get; }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {

            cfg.Host(new Uri(configProvider.Rabbit.HostAddress), host =>
            {
                host.Username(configProvider.Rabbit.Username);
                host.Password(configProvider.Rabbit.Password);
            });

            cfg.ReceiveEndpoint("MyQueue", e => 
            {
                e.Consumer<ConsumerClass>();
            });
        });
    }
}

Служба обмена сообщениями

public interface IMessagingService
{
    Task Publish<T>(object payload) where T : class;
    Task Send<T>(object payload) where T : class;
}

public class MassTransitMessagingService : IMessagingService
{
    readonly IMassTransitTransport massTransitTransport;

    public MassTransitMessagingService(IMassTransitTransport massTransitTransport)
    {
        //transport bus config already happens in massTransitTransport constructor
        this.massTransitTransport = massTransitTransport;
    }

    public async Task Publish<T>(object payload) where T : class
    {
        await massTransitTransport.BusControl.Publish<T>(payload);
    }

    public async Task Send<T>(object payload) where T : class
    {
        var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(new Uri(massTransitTransport.BusControl.Address, typeof(T).ToString()));
        await endpoint.Send<T>(payload);
    }
}

Интерфейсы и классы запросов / ответов

public interface IEventRequest
{
    Guid EventGuid { get; set; }
    string Message { get; set; }
}

public interface IEventResponse
{
    Guid EventGuid { get; set; }
    string RequestMessage { get; set; }
    string ResponseMessage { get; set; }
}

public class ConsumerClass : IConsumer<IEventResponse>
{
    public async Task Consume(ConsumeContext<IEventResponse> context)
    {
        var payloadResponse = new
        {
            context.Message.EventGuid,
            context.Message.RequestMessage,
            ResponseMessage = "This is the response message;"
        };

        await context.RespondAsync(payloadResponse);
    }
}

Метод тестирования для выполнения запроса / ответа

    [Test]
    public async Task SendAndReceiveMessage()
    {
        // arrange
        var config = GetConfiguration();
        var transport = new MassTransitRabbitMqTransport(config);

        var payload = new
        {
            EventGuid = Guid.NewGuid(),
            Message = "This is an event message"
        };

        var clientFactory = transport.BusControl.CreateClientFactory();
        var client = clientFactory.CreateRequestClient<IEventRequest>();
        var response = await client.GetResponse<ConsumerClass>(payload);

    }

Обновление 1

На основании отзывов Криса Паттерсона я внес следующие изменения. Кажется, я вызвал путаницу, показав фрагмент из класса запуска.

На самом деле здесь происходит две вещи: проект API, который содержит приведенные выше части и отрефакторированные фрагменты кода ниже, и проект интеграционного тестирования.

Что касается проекта интеграционного тестирования, на самом деле используются только 3 библиотеки классов. Проект интеграционного тестирования не содержит контейнера DI, поскольку это тестовый проект, и я хочу посмотреть, можно ли отделить логику от контейнера DI. Кроме того, у меня нет ILogger в проекте интеграционного тестирования.

Все решение представляет собой проект API, у которого есть запуск со встроенным контейнером DI и контроллером, реализующим публикацию. Моя попытка состоит в том, чтобы отделить логику MassTransit от контейнера DI, чтобы проект MassTransitTransport можно было использовать где-то еще и чтобы можно было использовать любой транспорт, поддерживаемый MassTransit. Мой вопрос (ы) в том, является ли это плохой идеей с точки зрения потребления сообщения (т.е. невозможно сделать, если не используется контейнер DI), или можно ли это сделать. Если это можно сделать, что мне не хватает / что не так?

Конфигурация - для использования IConfigurationProvider MassTransitTransport - содержит ConsumerClass, IEventRequest, IEventResponse, EventResponse, IMassTransitTransport, MassTransitTransportRabbitMqTransport, MassTransitZaureTransportRabbitMqTransport, MassTransitZaureTransportBusTransport, MassTransitZaureTransportTransport, MassTransagingSowerSystems, MassConsumerFreeSystemService - Massaging

Создали EventResponse и использовали его в ConsumerClass

public class EventResponse : IEventResponse
{
    public Guid EventGuid { get; set; }
    public string RequestMessage { get; set; }
    public string ResponseMessage { get; set; }
}

Потребительский класс

public class ConsumerClass : IConsumer<IEventResponse>
{
    public async Task Consume(ConsumeContext<IEventResponse> context)
    {
        var payloadResponse = new EventResponse()
        {
            EventGuid = context.Message.EventGuid,
            RequestMessage = context.Message.RequestMessage,
            ResponseMessage = "This is the response message;"
        };

        await context.RespondAsync(payloadResponse);
    }
}

Тестовый проект интеграции

    [Test]
    public async Task SendAndReceiveMessage()
    {
        // arrange
        var config = GetConfiguration();
        var transport = new MassTransitRabbitMqTransport(config);
        //var transport = new MassTransitAzureServiceBusTransport(config);

        var payload = new
        {
            EventGuid = Guid.NewGuid(),
            Message = "This is an event message"
        };

        var clientFactory = transport.BusControl.CreateClientFactory();
        var client = clientFactory.CreateRequestClient<IEventRequest>();
        var response = await client.GetResponse<IEventResponse>(payload);
    }

Получено сообщение об ошибке

MassTransit.RequestTimeoutException: Тайм-аут ожидания ответа, RequestId: 7a000000-9a3c-0005-8037-08d80882f498


person Lee Z    schedule 03.06.2020    source источник
comment
Ваш payloadResponse - анонимный тип, что запрещено. Включение ведения журнала (через ILoggerFactory, см. Документацию) показало бы вам ошибку. Вам нужно использовать ResponseAsync ‹T› ().   -  person Chris Patterson    schedule 04.06.2020
comment
Кроме того, GetResponse<IEventResponse>() - потребитель не является типом сообщения для ответа.   -  person Chris Patterson    schedule 04.06.2020
comment
Кроме того, я бы избегал запускать шину или выполнять какие-либо операции ввода-вывода в конфигурации службы. ConfigureServices метод заключается в настройке вещей, а не в запуске инфраструктуры. Вы можете зарегистрировать размещенную службу, чтобы запускать и останавливать автобус services.AddSingleton<IHostedService>(new BusHostedService(bus))   -  person Alexey Zimarev    schedule 04.06.2020
comment
@ChrisPatterson Я провел некоторый рефакторинг, но все еще получаю таймауты. Я добавил Обновление 1, чтобы попытаться прояснить свой вопрос, поскольку вводил в заблуждение. Я пытаюсь максимально отделить проект MassTransit от контейнера DI, поскольку я планирую подключить этот компонент к другим решениям. Пожалуйста, дайте мне знать, если это не лучшая идея (т.е. MassTransit зависит от контейнера DI?)   -  person Lee Z    schedule 04.06.2020
comment
MassTransit вообще не требует DI.   -  person Chris Patterson    schedule 04.06.2020
comment
@ChrisPatterson Тогда я запутался. В созданном мною тестовом проекте интеграции я никогда не прохожу мимо строки: var response = await client.GetResponse ‹IEventResponse› (полезная нагрузка); Я думал, что он вызовет ConsumerClass, но это не так. Есть установка, которую мне не хватает?   -  person Lee Z    schedule 04.06.2020
comment
Вы вызываете StartAsync () в конструкторе, а не ожидаете его. Обычно время ответа истекает из-за того, что автобус не заводится.   -  person Chris Patterson    schedule 04.06.2020
comment
Я бы предложил использовать тестовую программу для модульного тестирования, как описано в документации. Вы перетаскиваете в свои тесты слишком много не связанных между собой проблем. И добавление тонны сложности вокруг себя, пытаясь быть агностиком - возможно, подумайте, где вы проведете границы между бизнес-логикой, которая зависит только от MassTransit, а затем создадите отдельные сборки для интеграции вашего транспорта.   -  person Chris Patterson    schedule 04.06.2020
comment
Спасибо, Крис, было 2 проблемы. Первым был StartAsync вместо Start. Во-вторых, ConsumerClass не был реализован правильно. Он должен был унаследовать от IConsumer ‹IEventRequest› и иметь метод как общедоступную асинхронную задачу Consume (контекст ConsumeContext ‹IEventRequest›). После этого все заработало. Я действительно думаю, что путь DI будет чище и, вероятно, пойдет в этом направлении. Спасибо за вашу помощь - мы очень благодарны   -  person Lee Z    schedule 04.06.2020


Ответы (1)


Я добавляю ответ в комментарий, чтобы закрыть эту проблему:

было 2 вопроса. Первым был StartAsync вместо Start. Во-вторых, ConsumerClass не был реализован правильно. Он должен был унаследовать от IConsumer и иметь метод как общедоступную асинхронную задачу Consume (контекст ConsumeContext). После этого все заработало.

person Tom Luo    schedule 26.06.2020