Можно ли использовать InMemory Transport для конкурирующих потребителей в Rebus?

Можно ли использовать Rebus InMemory Transport для реализации конкурирующих потребителей, таких как образец RabbitScaleout?

Контекст: у меня есть монолитное приложение, которое мне нужно разбить и масштабировать на определенные части. Мы должны поддерживать монолитное развертывание, работая над контейнерным/облачным развертыванием.

Я намеревался провести рефакторинг, использовать Rebus для координации между службами и использовать InMemoryTransport для монолита и RabbitMQ для контейнерных служб. Будет ли это работать?

В приведенном ниже коде вызываются оба обработчика.

class Program
    {
        static void Main(string[] args)
        {
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Register(() => new Handler1());
                activator.Register(() => new Handler2());

                Configure.With(activator)
                    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "queue"))
                    .Logging( t=> t.None())
                    .Routing( r=> r.TypeBased().Map<string>("queue"))
                    .Start();


                activator.Bus.Send("test");

                Thread.Sleep(1000);
                    
            }
        }
    }


    class Handler1 : IHandleMessages<String>
    {
        public async Task Handle(String message)
        {
            Console.WriteLine("Handler1 " + message);
        }
    }

    class Handler2 : IHandleMessages<String>
    {
        public async Task Handle(string message)
        {
            Console.WriteLine("Handler2 " + message);
        }
    }

person rareshb    schedule 02.12.2020    source источник


Ответы (1)


Транспорт в памяти, безусловно, можно использовать в сценарии конкурирующих потребителей.

Хитрость заключается в том, чтобы передать один и тот же экземпляр InMemNetwork двум экземплярам шины с одинаковым именем входной очереди — тогда они будут распределять сообщения между всеми экземплярами.

// buses will be connected on this network
var network = new InMemNetwork();

using (var activator1 = new BuiltinHandlerActivator())
using (var activator2 = new BuiltinHandlerActivator())
{
    // when we're doing "competing consumers", we should probably execute
    // the same logic – therefore, we register the same handler twice here:
    activator1.Register(() => new Handler1());
    activator2.Register(() => new Handler1());

    // start bus 1
    Configure.With(activator1)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // start bus 2
    Configure.With(activator2)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // always AWAIT here
    await activator1.Bus.Send("test");
    //
    // - if that's not possible, use the sync bus:
    activator1.Bus.Advanced.SyncBus.Send("test");

    Thread.Sleep(1000);
}

Хотя это сработает, я не уверен, что буду его рекомендовать — in-mem-транспорт из-за того, что он находится в памяти, не обеспечивает никакой долговечности, поэтому вы потеряете все сообщения во всех очередях, если InMemNetwork экземпляр потерян (например, из-за перезапуска процесса или чего-то еще).

Кроме того, если цель состоит в том, чтобы распараллелить обработку сообщений, я не уверен, что вы захотите этого, потому что шины будут работать в одном и том же процессе и, таким образом, конкурировать за одни и те же ресурсы, поэтому эффект от этого будет просто быть другим видом параллелизма... и вы могли бы более легко распараллелить обработку сообщений либо

а) усиление параллелизма

Configure.With(...)
    .(...)
    .Options(o => o.SetMaxParallelism(100))
    .Start();

(здесь позволяет обрабатывать до 100 сообщений параллельно, если их работа основана на Task - обычно это связано с вводом-выводом),

или б) увеличение количества рабочих потоков

Configure.With(...)
    .(...)
    .Options(o => o.SetNumberOfWorkers(10))
    .Start();

(что в основном имеет смысл, если ваш транспорт не имеет API на основе Task) (здесь настройка Rebus для создания 10 выделенных потоков для обработки сообщений).

person mookid8000    schedule 03.12.2020
comment
Спасибо mookid8000. Я проверил это, и это работает. Моя идея состояла в том, чтобы иметь одну и ту же топологию очередей/потребителей, как в памяти, так и в распределенной, только с различным количеством потребителей, так что запуск в одном процессе в памяти или в распределенной памяти будет в основном вопросом конфигурации. Я все еще расследую, однако. - person rareshb; 04.12.2020