Реализация шины только публикации в MassTransit v3 с помощью C # и RabbitMQ

Я пытаюсь реализовать шину только для публикации в MassTransit v3 с C # и RabbitMQ, где у шины нет потребителя. Идея состоит в том, что сообщения будут публиковаться и помещаться в очередь, а затем отдельный микросервис будет потреблять сообщения из очереди. Глядя на этот ответ SO, необходимо указать конечные точки приема, чтобы сообщения действительно в очереди. Однако это, похоже, противоречит распространенным ошибкам в Документы MassTransit, в которых указано If you need to only send or publish messages, don’t create any receive endpoints.

Вот пример кода:

    public class Program
    {
        static void Main(string[] args)
        {
            var bus = BusConfigurator.ConfigureBus();

            bus.Start();

            bus.Publish<IItemToQueue>(new ItemToQueue { Text = "Hello World" }).Wait();

            Console.ReadKey();

            bus.Stop();
        }
    }

    public static class BusConfigurator
    {
        public static IBusControl ConfigureBus()
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst =>
                {
                    hst.Username("guest");
                    hst.Password("guest");
                });

                cfg.ReceiveEndpoint(host, "queuename", e =>
                {
                    e.Consumer<MyConsumer>();
                });
            });

            return bus;
        }
    }

    public interface IItemToQueue
    {
        string Text { get; set; }
    }

    public class ItemToQueue : IItemToQueue
    {
        public string Text { get; set; }
    }

    public class MyConsumer : IConsumer<IItemToQueue>
    {
        public async Task Consume(ConsumeContext<IItemToQueue> context)
        {
            await Console.Out.WriteLineAsync(context.Message.Text);
        }
    }

В этом примере я получаю сообщение в очереди RabbitMQ, как и ожидалось, и оно потребляется MyConsumer, который записывает Hello World в консоль, а затем сообщение удаляется из очереди.

Однако, когда я удаляю следующий код из приведенного выше и повторно запускаю образец:

cfg.ReceiveEndpoint(host, RabbitMqConstants.ValidationQueue, e =>
{
    e.Consumer<MyConsumer>();
}); 

Создается временная очередь (с сгенерированным именем), и сообщение, кажется, никогда не помещается во временную очередь. Затем эта очередь удаляется при остановке автобуса.

У меня проблема с указанием ReceiveEndpoint, сообщения будут использоваться и удаляться из очереди в программе-издателе (это означает, что микросервис-потребитель не будет обрабатывать элементы в очереди). Без указанного RecieveEndpoint используется временная очередь (и микросервис-потребитель не будет знать имя этой временной очереди), сообщение никогда не попадает в очередь, и очередь удаляется при остановке шины, что было бы плохо, если бы программа вышла из строя.

В документах MassTransit есть пример шины только для отправки но это довольно просто, поэтому мне было интересно, есть ли у кого-нибудь предложения?


person Richard Pursehouse    schedule 29.09.2016    source источник
comment
Хотя вам не нужны конечные точки приема для издателей / отправителей, они вам нужны где-то, иначе нет никаких очередей, привязанных к обмену сообщениями, и сообщения никуда не маршрутизируются - и, следовательно, пропадают.   -  person Chris Patterson    schedule 03.10.2016


Ответы (1)


Конечная точка приема должна находиться в вашей службе, отдельно от приложения, предназначенного только для публикации. Таким образом, служба будет иметь конечную точку приема и потреблять сообщения по мере их публикации приложением.

Если у вас есть конечная точка приема в приложении, приложение будет потреблять сообщения, поскольку оно имеет то же имя очереди, которое указано в конечной точке приема.

Что вам нужно сделать, так это создать другую службу с той же конфигурацией (включая конечную точку приема) и вывести конечную точку приема из вашего приложения. В этот момент служба будет иметь конечную точку приема и потреблять сообщения из очереди. Даже если служба остановлена, сообщения будут продолжать доставляться в очередь, и после запуска службы они начнут использовать.

person Chris Patterson    schedule 03.10.2016
comment
Это просто вопрос настройки RabbitMQ в первую очередь? Я нахожусь в аналогичной ситуации, и мне нужно гарантировать, что очередь получателя получит сообщения, как только приложение начнет отправлять сообщения, независимо от того, работает ли служба в этот момент, пока они сохраняются и в конечном итоге будут обработаны . Если после запуска службы все настроено для удовлетворения этого требования, это нормально. Это область, в которой документация может использовать некоторые примеры. Если это сработает, я буду счастлив написать об этом. - person Colin Young; 15.02.2017
comment
Для этого есть проблема в GitHub, по сути, не запускайте, а создайте топологию в RabbitMQ. Проблемы, возникающие из-за подхода к подключению самостоятельно, заключаются в том, что тонкие нюансы в системе типов могут быть неправильно настроены вручную, что может вызвать ошибки при запуске. Однако, если вы отправляете в определенную очередь, существует параметр строки запроса, чтобы отправитель мог привязать обмен к очереди. Это либо bind, либо bindQueue = true. - person Chris Patterson; 15.02.2017
comment
Благодарю. Можете ли вы предоставить ссылку на проблему? Сегодня утром я наконец смог наладить дело. После того, как я запустил один раз с потребителем, я смог удалить его и по-прежнему сохранять сообщения при следующем запуске. Я нахожусь в режиме разработки / исследования, очевидно, что это не план для производства :) Вы действительно поднимаете некоторые вопросы о типах и сообщениях о версиях, о которых я думал, но это тема для другого поста после того, как я немного поиграю - person Colin Young; 15.02.2017