Logstash - ›Elasticsearch - обновить денормализованные данные

Объяснение варианта использования

У нас есть реляционная база данных с данными о наших повседневных операциях. Цель состоит в том, чтобы позволить пользователям искать важные данные с помощью полнотекстовой поисковой системы. Данные нормализованы и, таким образом, находятся не в лучшей форме для выполнения полнотекстовых запросов, поэтому идея заключалась в денормализации подмножества данных и их копировании в режиме реального времени в Elasticsearch, что позволяет нам создавать быстрое и точное поисковое приложение. .

У нас уже есть система, которая позволяет Event Sourcing для наших операций с базой данных (вставки, обновления, удаления ). События содержат только измененные столбцы и первичные ключи (при обновлении мы не получаем всю строку). Logstash уже получает уведомление о каждом событии, поэтому эта часть уже обработана.


Актуальная проблема

Теперь мы подошли к нашей проблеме. Поскольку мы планируем денормализовать наши данные, мы должны убедиться, что обновления родительских объектов распространяются на денормализованные дочерние объекты в Elasticsearch. Как мы можем настроить logstash для этого?

Пример

Допустим, мы ведем список Employees в Elasticsearch. Каждому Employee присваивается Company. Поскольку данные денормализованы (для ускорения поиска), каждый Employee также несет имя и адрес Company. Обновление изменяет имя Company - как мы можем настроить logstash для обновления названия компании во всех Employees, назначенных Company?


Дополнительное объяснение

@Darth_Vader: Проблема, с которой мы сталкиваемся, заключается в том, что мы получаем событие, что Company изменился, но мы хотим изменить документы типа Employee в Elasticsearch, потому что они несут данные о компании сами по себе. В вашем ответе ожидается, что мы будем получать событие для каждого Employee, а это не так.

Может, это прояснит. У нас в Elasticsearch 3 сотрудника:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Затем в исходной БД происходит обновление.

UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;

Мы получаем событие в logstash, где написано что-то вроде этого:

{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}

Затем это следует передать в Elasticsearch, чтобы в результате были следующие сотрудники:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

Обратите внимание, что поле company.name изменилось.


person lmazgon    schedule 01.02.2017    source источник
comment
Вы хотите обновить название компании всех сотрудников с помощью поля company, назначенного каждому и каждому из них?   -  person Kulasangar    schedule 01.02.2017
comment
Обновление не обязательно коснется всех сотрудников, но только тех, кто работает в измененной компании. И да, у каждого сотрудника есть поле company.id, company.name, company.address и т. Д.   -  person lmazgon    schedule 01.02.2017


Ответы (1)


Я предлагаю решение, аналогичное тому, что я опубликовал здесь, т. е. использовать модуль вывода http для выполнения обновления путем вызова запроса к индексу сотрудников. Запрос должен выглядеть так:

POST employees/_update_by_query
{
  "script": {
    "source": "ctx._source.company.name = params.name",
    "lang": "painless",
    "params": {
      "name": "Company NEW"
    }
  },
  "query": {
    "term": {
      "company.cmp_id": "1"
    }
  }
}

Итак, ваша конфигурация Logstash должна выглядеть так:

input {
  ... 
}
filter {
  mutate {
    add_field => {
      "[script][lang]" => "painless"
      "[script][source]" => "ctx._source.company.name = params.name"
      "[script][params][name]" => "%{new.name}"
      "[query][term][company.cmp_id]" => "%{cmp_id}"
    }
    remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
  }
}
output {
  http {
    url => "http://localhost:9200/employees/_update_by_query"
    http_method => "post"
    format => "json"
  }
}
person Val    schedule 21.11.2018
comment
Прошло много времени с тех пор, как я разместил этот вопрос, поэтому я не помню никаких подробностей, но, если я не ошибаюсь, я также использовал плагин вывода http для отправки обновления по запросу запроса. Я поверю вам, что конфигурация верна. - person lmazgon; 21.11.2018
comment
Понятно, я наткнулся на этот пост по ссылке в этом проблема с github - person Val; 21.11.2018