У меня есть Filebeat
экземпляр, который отправляет Apache
журналы доступа в Logstash
. Logstash
конвейер преобразует файл и загружает обработанные поля, например (field1, field2 & field3) в elastic search
в индекс indexA. Процесс прост и работает. Вот мой pipeline.conf
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "source"
remove_field => "type"
remove_field => "tags"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
Теперь я хочу добавить три других поля field4 и field5 и добавить их в отдельный индекс с именем indexB. Таким образом, в конце indexA содержит field1 field2 и field3, а IndexB содержит field4 и field5
Пока что это модифицированный pipeline.conf, который, похоже, не работает.
input{
beats{
port => "5043"
}
}
filter
{
grok
{
patterns_dir => ["/usr/share/logstash/patterns"]
match =>{ "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
"%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ]
}
remove_field => "@version"
remove_field => "beat"
remove_field => "input_type"
remove_field => "type"
remove_field => "http_version"
remove_field => "@timestamp"
remove_field => "message"
}
mutate
{
add_field => { "field1" => "%{access_time}" }
add_field => { "field2" => "%{host}" }
add_field => { "field3" => "%{read_timestamp}" }
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexA"
}
}
filter
{
mutate
{
add_field => { "field4" => "%{source}" }
add_field => { "field5" => "%{tags}" }
remove_field => "field1"
remove_field => "field2"
remove_field => "field3"
}
}
output {
elasticsearch{
hosts => ["localhost:9200"]
index => "indexB"
}
}
Может ли кто-нибудь указать, в чем я ошибаюсь, или какой-либо альтернативный вариант решения.