Я пытаюсь реализовать MassTransit, не зависящую от типа транспорта (например, служебную шину Azure, RabbitMQ и т. Д.). Я отделил логику от контейнера DI, поскольку контейнер DI добавляет область действия интерфейса к конкретному классу и имеет фабрику для определения транспорта на основе конфигурации. Это работает для публикации и отправки, но когда я пытаюсь выполнить запрос / ответ или использование, создаются обмены / очереди для RabbitMq, а также темы и очередь для служебной шины Azure, но сообщение не получено.
Я думаю, что затем я могу взять библиотеку классов, которая выполняет всю логику и переносить из проекта в проект, не имея большого фрагмента всех потребителей, поскольку они могут отличаться от проекта к проекту.
Я чувствую, что упускаю что-то важное. Это не будет работать без выполнения .AddConsumer в контейнере DI после использования AddMassTransit для добавления конфигурации шины в контейнер DI, что является противоположностью тому, что я пытаюсь.
Я создал тестовый проект интеграции, чтобы попытаться использовать TDD, при этом отлаживая эту функциональность перед добавлением в службу, и здесь меня заблокировали. Переменная ответа никогда не возвращается, и в конце концов я получаю тайм-аут. Проект интеграции зависит только от библиотеки классов MassTransit и библиотеки классов конфигурации и фактически обращается непосредственно к транспорту (в отличие от имитации над ним).
Я близок или мне нужно отказаться от этого квеста?
По коду из моего текущего проекта:
Startup.cs
Configuration.IConfigurationProvider cfg = new ServiceFabricConfigurationProvider();
switch (cfg.MessageService.MessagingService)
{
case "AzureServiceBus":
services.AddScoped<IMassTransitTransport, MassTransitAzureServiceBusTransport>();
break;
case "RabbitMq":
services.AddScoped<IMassTransitTransport, MassTransitRabbitMqTransport>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddScoped<IMessagingService, MassTransitMessagingService>();
Транспортные классы очень похожи, поскольку имеют общий интерфейс - оба показаны ниже.
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitAzureServiceBusTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(configProvider.AzureServiceBus.AzureServiceBusConnectionString);
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
MassTransitRabbitMqTransport
public sealed class MassTransitRabbitMqTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitRabbitMqTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(configProvider.Rabbit.HostAddress), host =>
{
host.Username(configProvider.Rabbit.Username);
host.Password(configProvider.Rabbit.Password);
});
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
}
Служба обмена сообщениями
public interface IMessagingService
{
Task Publish<T>(object payload) where T : class;
Task Send<T>(object payload) where T : class;
}
public class MassTransitMessagingService : IMessagingService
{
readonly IMassTransitTransport massTransitTransport;
public MassTransitMessagingService(IMassTransitTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish<T>(object payload) where T : class
{
await massTransitTransport.BusControl.Publish<T>(payload);
}
public async Task Send<T>(object payload) where T : class
{
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(new Uri(massTransitTransport.BusControl.Address, typeof(T).ToString()));
await endpoint.Send<T>(payload);
}
}
Интерфейсы и классы запросов / ответов
public interface IEventRequest
{
Guid EventGuid { get; set; }
string Message { get; set; }
}
public interface IEventResponse
{
Guid EventGuid { get; set; }
string RequestMessage { get; set; }
string ResponseMessage { get; set; }
}
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new
{
context.Message.EventGuid,
context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
Метод тестирования для выполнения запроса / ответа
[Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<ConsumerClass>(payload);
}
Обновление 1
На основании отзывов Криса Паттерсона я внес следующие изменения. Кажется, я вызвал путаницу, показав фрагмент из класса запуска.
На самом деле здесь происходит две вещи: проект API, который содержит приведенные выше части и отрефакторированные фрагменты кода ниже, и проект интеграционного тестирования.
Что касается проекта интеграционного тестирования, на самом деле используются только 3 библиотеки классов. Проект интеграционного тестирования не содержит контейнера DI, поскольку это тестовый проект, и я хочу посмотреть, можно ли отделить логику от контейнера DI. Кроме того, у меня нет ILogger в проекте интеграционного тестирования.
Все решение представляет собой проект API, у которого есть запуск со встроенным контейнером DI и контроллером, реализующим публикацию. Моя попытка состоит в том, чтобы отделить логику MassTransit от контейнера DI, чтобы проект MassTransitTransport можно было использовать где-то еще и чтобы можно было использовать любой транспорт, поддерживаемый MassTransit. Мой вопрос (ы) в том, является ли это плохой идеей с точки зрения потребления сообщения (т.е. невозможно сделать, если не используется контейнер DI), или можно ли это сделать. Если это можно сделать, что мне не хватает / что не так?
Конфигурация - для использования IConfigurationProvider MassTransitTransport - содержит ConsumerClass, IEventRequest, IEventResponse, EventResponse, IMassTransitTransport, MassTransitTransportRabbitMqTransport, MassTransitZaureTransportRabbitMqTransport, MassTransitZaureTransportBusTransport, MassTransitZaureTransportTransport, MassTransagingSowerSystems, MassConsumerFreeSystemService - Massaging
Создали EventResponse и использовали его в ConsumerClass
public class EventResponse : IEventResponse
{
public Guid EventGuid { get; set; }
public string RequestMessage { get; set; }
public string ResponseMessage { get; set; }
}
Потребительский класс
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new EventResponse()
{
EventGuid = context.Message.EventGuid,
RequestMessage = context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
Тестовый проект интеграции
[Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
//var transport = new MassTransitAzureServiceBusTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<IEventResponse>(payload);
}
Получено сообщение об ошибке
MassTransit.RequestTimeoutException: Тайм-аут ожидания ответа, RequestId: 7a000000-9a3c-0005-8037-08d80882f498
payloadResponse
- анонимный тип, что запрещено. Включение ведения журнала (через ILoggerFactory, см. Документацию) показало бы вам ошибку. Вам нужно использовать ResponseAsync ‹T› (). - person Chris Patterson   schedule 04.06.2020GetResponse<IEventResponse>()
- потребитель не является типом сообщения для ответа. - person Chris Patterson   schedule 04.06.2020ConfigureServices
метод заключается в настройке вещей, а не в запуске инфраструктуры. Вы можете зарегистрировать размещенную службу, чтобы запускать и останавливать автобусservices.AddSingleton<IHostedService>(new BusHostedService(bus))
- person Alexey Zimarev   schedule 04.06.2020