Создать поле с помощью fluentd

У меня есть журналы, которые я использую с помощью Fluentd и отправляю в Elasticsearch. Я хотел бы создать новое поле, если будет найдена строка.

Образец журнала:

{
  "@timestamp": "2021-01-29T08:05:38.613Z",
  "@version": "1",
  "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
  "level": "INFO"
}

Я хочу создать новое поле НАЧАЛО ВРЕМЕНИ, и в этом случае значение будет 113.187.

Я пробовал использовать разделение record_transformer и ruby ​​для получения значения, но кажется, что когда оно совпадает с ним, удаляет строку, которую я хочу, из файла журнала.

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running").last.split(")")}
  </record>
</filter>

Как я могу создать это новое поле с желаемым значением?

Я использовал предложенный ниже вариант:

<filter**>
  @type record_transformer
  enable_ruby true
  <record>
    STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}
  </record>
</filter>

Это приблизило меня. Что происходит сейчас, так это то, что создается поле STARTIME, и когда запись в журнале совпадает, оно имеет значение 113.187, что является правильным, однако каждая другая строка, которая не соответствует этому шаблону, просто добавляется в новое поле.

введите описание изображения здесь


person isa    schedule 29.01.2021    source источник
comment
Что вы имеете в виду, говоря, что когда он соответствует, удаляет строку, которую я хочу из файла журнала? Не могли бы вы немного уточнить?   -  person Azeem    schedule 30.01.2021
comment
Просто протестируйте свой образец журнала с помощью этого: STARTIME ${record["message"].split("JVM running for ").last.split(")")[0]}, а вывод через stdout - 2021-01-30 18:45:05.885183400 +0500 test: {"@timestamp":"2021-01-29T08:05:38.613Z","@version":"1","message":"Started Application in 110.374 seconds (JVM running for 113.187)","level":"INFO","STARTIME":"113.187"}.   -  person Azeem    schedule 30.01.2021
comment
@Azeem, спасибо, что сблизил меня. Что происходит сейчас, так это то, что создается поле STARTIME, и когда запись в журнале совпадает, оно дает 113.187, что является правильным, однако каждая другая строка, которая не соответствует этому шаблону, просто добавляется в новое поле.   -  person isa    schedule 01.02.2021
comment
Не могли бы вы дополнить свой вопрос примером?   -  person Azeem    schedule 01.02.2021
comment
Можете ли вы позволить себе иметь STARTIME с null, если фактическое значение не найдено?   -  person Azeem    schedule 01.02.2021
comment
Попробуйте: STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }   -  person Azeem    schedule 01.02.2021
comment
Поле STARTIME будет иметь допустимое значение, в противном случае null. Так что поступайте соответственно с другой стороны.   -  person Azeem    schedule 01.02.2021
comment
Это отлично работает, спасибо   -  person isa    schedule 01.02.2021
comment
Пожалуйста! Я вскоре отправлю ответ с описанием.   -  person Azeem    schedule 01.02.2021


Ответы (2)


Вы можете попробовать что-то вроде этого:

<record>
  STARTIME ${ s = record['message'][/JVM running for \d{3}.\d{3}/]; s ? s.split(' ')[-1] : nil }
</record>

STARTIME будет иметь допустимое значение, в противном случае null.

person Azeem    schedule 01.02.2021

Возможно, это не прямой ответ для решения этой проблемы с помощью преобразований Fluentd, но вы можете использовать Elasticsearch конвейеры приема вместе с процессор grok для извлечения ваших данных. Это смоделированный пример:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Enrich logs",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "(JVM running for %{NUMBER:start_time})"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "@timestamp": "2021-01-29T08:05:38.613Z",
        "@version": "1",
        "message": "Started Application in 110.374 seconds (JVM running for 113.187)",
        "level": "INFO"
      }
    }
  ]
}

_source - это предоставленный вами документ, и есть единственный процессор Grok для извлечения start_time из поля message. Вызов этого конвейера приводит к:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "start_time" : "113.187",
          "@timestamp" : "2021-01-29T08:05:38.613Z",
          "level" : "INFO",
          "@version" : "1",
          "message" : "Started Application in 110.374 seconds (JVM running for 113.187)"
        },
        "_ingest" : {
          "timestamp" : "2021-01-29T14:09:43.447147676Z"
        }
      }
    }
  ]
}

Вы можете видеть, что после преобразования ваш документ содержит значение "start_time" : "113.187".

person Evaldas Buinauskas    schedule 29.01.2021