Logstash — как запускать задачи Celery через RabbitMQ

Может кто-нибудь объяснить мне, как я могу запускать задачи Celery через Logstash? Является ли это возможным?

Если я попытаюсь сделать это в PHP через библиотеку 'php-amqplib', все будет нормально: (без использования Logstash)

$connection = new AMQPStreamConnection(
    'rabbitmq.local',
    5672,
    'guest',
    'guest'
);
$channel = $connection->channel();
$channel->queue_declare(
    'celery',
    false,
    true,
    false,
    false
);
$taskId = rand(1000, 10000);
$props = array(
    'content_type' => 'application/json',
    'content_encoding' => 'utf-8',
);

$body = array(
    'task'      => 'process_next_task',
    'lang'      => 'py',
    'args'      => array('ktest' => 'vtest'),
    'kwargs'    => array('ktest' => 'vtest'),
    'origin'    => '@'.'mytest',
    'id'        => $taskId,
);

$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');

Согласно документам Celery:

http://docs.celeryproject.org/en/latest/internals/protocol.html

Я пытаюсь отправить запрос в формате json, это мой фильтр Logstash:

ruby 
{
    remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
    code => "
        event.set('properties', 
        {
            :content_type => 'application/json',
            :content_encoding => 'utf-8'
        })
    "
}

И ответ сельдерея:

[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}

По сути, Celery не может декодировать мой формат сообщения или лучше... Я не могу установить запрос в формате JSON :)

Это сводит меня с ума, заранее спасибо за любые подсказки :)

Забыл, это мой выходной плагин в Logstash

rabbitmq
            {
                key             => "celery"
                exchange        => "celery"
                exchange_type   => "direct"
                user            => "${RABBITMQ_USER}"
                password        => "${RABBITMQ_PASSWORD}"
                host            => "${RABBITMQ_HOST}"
                port            => "${RABBITMQ_PORT}"
                durable         => true
                persistent      => true
                codec           => json

            }

person Dario    schedule 05.05.2017    source источник


Ответы (2)


Из информации, представленной в этом вопросе, вы можете 'т.

Когда вы играете с событием в фильтре ruby, вы на самом деле играете с тем, что будет помещено в тело сообщения, в то время как вы хотели бы установить заголовки и свойства rabbitmq вашего сообщения.

Пока эта функциональность не будет решена, я не думаю, что вы сможете добиться этого, если, конечно, вы не реализуете это сами. Ведь плагин доступен на github.

person Olivier    schedule 06.05.2017

Как сказал Оливье, сейчас это невозможно, но я создал запрос на включение в официальный проект. https://github.com/logstash-plugins/logstash-output-rabbitmq/pull/59

Если вы ищете рабочую версию, взгляните на мой клон:

https://github.com/useless-stuff/logstash-output-rabbitmq

Вы должны серьезно бояться этого кода :)

Я совершенно далек от того, чтобы быть Ruby-разработчиком

Но это работает :)

person Dario    schedule 08.05.2017