Logstash - списък за анализ на JSON

Наистина харесвам ELK за анализиране на регистрационните файлове. Въпреки това съм заседнал в момент, в който трябва да анализира списък с речници. Следват моите дневници: -

IP - - 0.000 0.000 [24/May/2015:06:51:13 +0000] *"POST /c.gif HTTP/1.1"* 200 4 * user_id=UserID&package_name=SomePackageName&model=Titanium+S202&country_code=in&android_id=AndroidID&eT=1432450271859&eTz=GMT%2B05%3A30&events=%5B%7B%22eV%22%3A%22com.olx.southasia%22%2C%22eC%22%3A%22appUpdate%22%2C%22eA%22%3A%22app_activated%22%2C%22eTz%22%3A%22GMT%2B05%3A30%22%2C%22eT%22%3A%221432386324909%22%2C%22eL%22%3A%22packageName%22%7D%5D * "-" "-" "-"

URL декодираната версия на горния журнал е

IP - - 0.000 0.000 [24/May/2015:06:51:13  0000] *"POST /c.gif HTTP/1.1"* 200 4 * user_id=UserID&package_name=SomePackageName&model=Titanium S202&country_code=in&android_id=AndroidID&eT=1432450271859&eTz=GMT+05:30&events=[{"eV":"com.olx.southasia","eC":"appUpdate","eA":"app_activated","eTz":"GMT+05:30","eT":"1432386324909","eL":"packageName"}] * "-" "-" "-"

Където и да се опитам да го анализирам, ми показва _jsonparsefailure. Прегледах и този въпрос и също така минах през различни форуми , но не намери идеално решение за същото. Как мога да анализирам json списък в logstash?? Ако не съществува досега, какво може да се заобиколи за същото.??

Следва моят конфигурационен файл.

filter {
    mutate {
        gsub => [
            "message", "\+", "%20"
        ]
    }

    urldecode{
        field => "message"
    }
    grok {
        match => [
            'message', '%{IP:clientip}%{GREEDYDATA} \[%{GREEDYDATA:timestamp}\] \*"%{WORD:method}%{GREEDYDATA}'
        ]
    }

    kv {
        field_split => "&?"
    }

    json{
        source => "events"
    }

    geoip {
        source => "clientip"
    }
}

person PythonEnthusiast    schedule 04.08.2015    source източник
comment
Филтърът kv{} трябва да ви създаде поле, наречено събития, което съдържа вашия json. Дали?   -  person Alain Collins    schedule 04.08.2015
comment
Да, така е. Но това не анализира стойностите на списъка на events   -  person PythonEnthusiast    schedule 04.08.2015
comment
Разбирам това, но ако kv{} не работи, json{} няма да има вход за обработка. Ако вземете стойността на събитието след kv{} филтъра към json валидатор, добре ли е?   -  person Alain Collins    schedule 04.08.2015
comment
Да, това е добре. Филтърът json успешно анализира json под формата на { "foo":"bar" } JSON, който не може да анализира, е [ { "foo":"bar" } ]   -  person PythonEnthusiast    schedule 04.08.2015


Отговори (1)


Този въпрос е точно копие на Parse json в списък в logstash . Дори със същите записи в дневника?! Може ли някой да осмисли това?

Можете да видите моя отговор там, но аз ще го обобщя за вас... опция д) е може би най-добрият подход


Очевидно получавате jsonparsefailure поради квадратните скоби. Като заобиколно решение можете ръчно да ги премахнете. Добавете следния филтър за мутиране след вашия kv и преди вашия json филтър:

mutate  {
    gsub => [ "events","\]",""]
    gsub => [ "events","\[",""]
}

Това обаче не работи за вход като [{"foo":"bar"},{"foo":"bar1"}]. Ето 4 варианта:

Вариант а) грозен gsub

Грозно решение би било друг gsub:

gsub => [ "event","\},\{",","]

Но това ще премахне вътрешните отношения, така че предполагам, че не искате да го правите.

Вариант b) разделяне

По-добър подход може да бъде използването на разделен филтър:

split {
    field => "event"
    terminator => ","
}
mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
   }
json{
    source=> "event"
}

Това ще генерира множество събития. (Първо с foo = bar и второ с foo1 = bar1.)

Опция c) мутиране на разделяне

Може да искате да имате всички стойности в едно logstash събитие. Можете да използвате филтъра mutate => split, за да генерирате масив и да анализирате json, ако съществува запис. За съжаление ще трябва да зададете условие за всеки запис, защото logstash не поддържа цикли в своята конфигурация.

mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
    split => [ "event", "," ]
   }

json{
    source=> "event[0]"
    target => "result[0]"
}

if 'event[1]' {
    json{
        source=> "event[1]"
        target => "result[1]"
    }
    if 'event[2]' {
        json{
            source=> "event[2]"
            target => "result[2]"
        }
    }
    # You would have to specify more conditionals if you expect even more dictionaries
}

Опция d) Ruby1

Следните работи (след вашия kv филтър): По-скоро използвайте опция e)

mutate  {
    gsub => [ "event","\]",""]
    gsub => [ "event","\[",""]
}

ruby  {
    init => "require 'json'"
    code => "
        e = event['event'].split(',')
        ary = Array.new
        e.each do |x|
            hash = JSON.parse(x)
            hash.each do |key, value|
                ary.push( { key =>  value } )
            end
        end
        event['result'] = ary
    "
}

АКТУАЛИЗИРАНЕ

Опция д) Ruby2

След известно тестване това може да е най-добрият подход. Използвайте това след вашия kv филтър:

ruby  {
    init => "require 'json'"
    code => "event['result'] = JSON.parse(event['event'])"
}
person hurb    schedule 04.08.2015