Можно ли вызвать публикацию Bunny::Exchange из отложенной работы?

У меня есть приложение Rails, которое иногда публикует сообщения в очередь RabbitMQ, используя гем "Bunny". Вот настройка:

# config/initializers/bunny.rb
$mq_connection = Bunny.new
$mq_connection.start
$mq_channel = $mq_connection.create_channel

В любом месте приложения я могу позвонить:

exchange = $mq_channel.default_exchange
exchange.publish(msg.to_json, persistent: true, routing_key: '...')

Это прекрасно работает, если я вызываю его из приложения или из консоли, но не работает, если вызывается из задания DelayedJob. Никаких исключений не возникает, но сообщение просто не отправляется.

Попытка с синглтоном:

Похоже, глобальные переменные, такие как $mq_channel, не могут быть найдены DelayedJob, поэтому я создал одноэлементную модель для ее хранения:

class RabbitMq
  include Singleton

  attr_accessor :connection, :channel

  def exchange
    channel.default_exchange
  end

  def setup
    self.connection = Bunny.new
    self.connection.start
    self.channel = connection.create_channel
  end

end

И я вызываю настройку из моего инициализатора:

# config/initializers/bunny.rb
RabbitMq.instance.setup

Но это тоже не работает. Задание завершается без ошибок, но ничего не публикуется.

Любая идея, как это сделать? Публикация сообщений в RabbitMQ из фонового рабочего процесса, такого как DJ, должна быть довольно распространенным явлением.


person jibai31    schedule 19.05.2017    source источник


Ответы (1)


Вот как я это делаю:

class Messaging::Publisher

  class << self

    def publish(message)
      new(message).publish
    end

  end # Class Methods

  #=========================================================================
  # Instance Methods      
  #=========================================================================

    def initialize(message)
      @message = message
    end

    def publish
      connection = Bunny.new(ENV['CLOUDAMQP_URL'])
      connection.start
      channel = connection.create_channel
      queue_name = "#{ENV['app_name']}.#{message.keys.first.to_s.pluralize}_queue"
      queue = channel.queue(queue_name, durable: true)
      channel.default_exchange.publish(message.to_json, :routing_key => queue.name)
      channel.close
      connection.stop
      true
    end

  private

    def message()   @message    end

end

Я вызываю это как из своего приложения (синхронно), так и из фоновых заданий (асинхронно). Что-то вроде этого:

class ServiceRequests::CreateManager < ServiceRequests::ManagerBase

  class << self

  private

  end # Class Methods

  #=========================================================================
  # Instance Methods
  #=========================================================================

    def manage
      Messaging::Publisher.publish service_request_message
    end

  private

    def service_request_message
      {
        service_request: {
          provider: {
            name: "Foo::Bar"
          },
          params: {
            baz: 'qux'
          }
        }
      }
    end

end
person jvillian    schedule 19.05.2017
comment
Это действительно работает, если я создаю новое соединение каждый раз, когда мне нужно опубликовать сообщение. Спасибо! - person jibai31; 22.05.2017