Настройте журналы из файла в beats.config файла Lostash.

Я использую ELK с filebeat. Я отправляю журналы из filebeat в Logstash, а оттуда в Elastic и визуализирую в Kibana. Я вставляю результат json, который отображается в результате журнала kibana, как показано ниже:

    {
  "_index": "filebeat-6.4.2-2018.10.30",
  "_type": "doc",
  "_source": {
    "@timestamp": "2018-10-30T09:15:31.697Z",
    "fields": {
      "server": "server1"
    },
    "prospector": {
      "type": "log"
    },
    "host": {
      "name": "kushmathapa"
    },
    "message": "{ \"datetime\": \"2018-10-23T18:04:00.811660Z\", \"level\": \"ERROR\", \"message\": \"No response from remote. Handshake timed out or transport failure detector triggered.\" }",
    "source": "C:\\logs\\batch-portal\\error.json",
    "input": {
      "type": "log"
    },
    "beat": {
      "name": "kushmathapa",
      "hostname": "kushmathapa",
      "version": "6.4.2"
    },
    "offset": 0,
    "tags": [
      "lighthouse1",
      "controller",
      "trt"
    ]
  },
  "fields": {
    "@timestamp": [
      "2018-10-30T09:15:31.697Z"
    ]
  }
}

Я хочу, чтобы это отображалось как

    {
  "_index": "filebeat-6.4.2-2018.10.30",
  "_type": "doc",
  "_source": {
    "@timestamp": "2018-10-30T09:15:31.697Z",
    "fields": {
      "server": "server1"
    },
    "prospector": {
      "type": "log"
    },
    "host": {
      "name": "kushmathapa"
    },
    "datetime": 2018-10-23T18:04:00.811660Z,
    "log_level": ERROR,
    "message": "{ \"No response from remote. Handshake timed out or transport failure detector triggered.\" }",
    "source": "C:\\logs\\batch-portal\\error.json",
    "input": {
      "type": "log"
    },
    "beat": {
      "name": "kushmathapa",
      "hostname": "kushmathapa",
      "version": "6.4.2"
    },
    "offset": 0,
    "tags": [
      "lighthouse1",
      "controller",
      "trt"
    ]
  },
  "fields": {
    "@timestamp": [
      "2018-10-30T09:15:31.697Z"
    ]
  }
}

Мой файл beats.config сейчас выглядит так

  input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" 
  } stdout {
    codec => rubydebug { metadata => true }
  }
}

Я применил фильтры, но мне что-то не хватает.


person wingskush    schedule 29.10.2018    source источник


Ответы (2)


Вы можете использовать файл конфигурации, который выглядит примерно так. В фильтре grok добавьте формат вашего журнала, который вы хотите загрузить в свой elasticsearch (например, обратитесь к упомянутой конфигурации).

input {
beats {
port => 5044
id => "my_plugin_id"
tags => ["logs"]
type => "abc"
}
}
filter {
if [type] == "abc" {
 mutate {
    gsub => [ "message", "\r", "" ]
}

    grok {
        break_on_match => true
                match => {
                         "message" => [
                         "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
                         ]
                  }
                  overwrite => [ "message" ]
    }

    grok {
        break_on_match => true
                match => {
                         "message" => [
                          "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
                         ]
                  }
                  overwrite => [ "message" ]
    }

date {
   match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,SSS" ]
} 
}
}
output {
if [type] == "abc" {
elasticsearch { 
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "logfiles"
} 
}
else {
elasticsearch { 
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "task_log"
} 
}
stdout {
codec => rubydebug { metadata => true }
}
}
person Coder    schedule 30.10.2018
comment
Поскольку сообщение находится в формате json (обновления в вопросе), фильтр grok не работает, поскольку сообщение находится в формате json - person wingskush; 30.10.2018
comment
Обратитесь к этому stackoverflow.com/questions/38869886/ Я думаю, вы получите представление о своем обновленном вопросе. - person Coder; 31.10.2018

Logstash должен знать, что поле message, которое вы получаете, находится в формате JSON. Здесь вы можете использовать фильтр json и получить почти все, что вам нужно, из коробки:

filter {
    json {
        target => "message"
    }
}

Вы можете использовать мутацию или добавлять / удалять поля, чтобы переименовать такие вещи, как level в log.level и datetime в @datetime, если эти имена необходимы.

person vase    schedule 29.10.2018