Logstash выводит разные поля в разные индексы эластичного поиска

У меня есть Filebeat экземпляр, который отправляет Apache журналы доступа в Logstash. Logstash конвейер преобразует файл и загружает обработанные поля, например (field1, field2 & field3) в elastic search в индекс indexA. Процесс прост и работает. Вот мой pipeline.conf

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "source"
        remove_field => "type"
        remove_field => "tags"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}

Теперь я хочу добавить три других поля field4 и field5 и добавить их в отдельный индекс с именем indexB. Таким образом, в конце indexA содержит field1 field2 и field3, а IndexB содержит field4 и field5

Пока что это модифицированный pipeline.conf, который, похоже, не работает.

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "type"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }   
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}
filter
{
    mutate
    {
        add_field => { "field4" => "%{source}" }
        add_field => { "field5" => "%{tags}" }
        remove_field => "field1"
        remove_field => "field2"
        remove_field => "field3"
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexB"
    }
}   

Может ли кто-нибудь указать, в чем я ошибаюсь, или какой-либо альтернативный вариант решения.


person Danish Bin Sofwan    schedule 08.08.2017    source источник


Ответы (1)


Вам необходимо продублировать свои события с помощью clone filter . Затем вы можете добавить нужные поля к каждому соответствующему событию и погрузить их в два разных индекса ES:

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "type"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    clone {
        clones => ["log1", "log2"]
    }
    if [type] == "log1" {
        mutate
        {
            add_field => { "field1" => "%{access_time}" }
            add_field => { "field2" => "%{host}" }
            add_field => { "field3" => "%{read_timestamp}" }
        }
    } else {   
        mutate
        {
            add_field => { "field4" => "%{source}" }
            add_field => { "field5" => "%{tags}" }
        }
    }
}
output {
    if [type] == "log1" {
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexA"
        }
    } else {   
        elasticsearch{
            hosts => ["localhost:9200"]
            index => "indexB"
        }
    }
}   
person Val    schedule 08.08.2017
comment
Это отлично помогает моему делу. Большое спасибо за изменение кода. - person Danish Bin Sofwan; 09.08.2017
comment
Отлично, рад, что помог! - person Val; 09.08.2017