Scala: для получения более 10000 документов/сообщений из Elasticsearch.

Я использую Scala 2.12, и у нас есть Elasticsearch 5.2.2. Мое требование касается только выборки/поиска на основе критериев. Поиск выдаст более 10 000 документов или сообщений за один раз. Поэтому я не могу использовать обычный поиск. Данные (каждый документ/сообщение) представляют собой сложный JSON, который я могу проанализировать позже. Поэтому мне нужно получить все такие сообщения и сохранить их в одном списке Json или что-то в этом роде. Я не очень хорошо владею Scala. Я могу использовать Elastic4s в Scala для поиска. Я вижу, что у него есть опция прокрутки и сканирования, но я не нашел полного рабочего примера. Так что ищите помощи.

Я вижу пример кода, как показано ниже, но мне нужна дополнительная помощь, чтобы получить все и разместить все, как указано выше:

client.execute {
   search in "index" / "type" query <yourquery> scroll "1m"
}

client.execute {
   search scroll <id>
}

Но как получить идентификатор прокрутки и как продолжить получение всех данных?


Обновление:

Версия scala и версия ES упомянуты выше.

Я использую следующий пример:

СБТ:

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % "7.0.2"

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "5.5.10"

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % "6.5.1"

libraryDependencies += "org.elasticsearch" % "elasticsearch" % "5.6.0"

Код:

import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.Response
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.HttpClient

import com.sksamuel.elastic4s.http.ElasticDsl._

val client = HttpClient(ElasticsearchClientUri("host", 9200))

val resp1 = client.execute {
     search("index")
       .matchQuery("key", "value")
       .scroll("1m")
       .limit(500)
   }.await.result

val resp2 = client.execute {
      searchScroll(resp1.scrollId.get).keepAlive(1.minute)
    }.await

Я думаю, что не использую правильные версии для модулей elastic4s.

Исусы:

  • import com.sksamuel.elastic4s.HttpClient: он не распознает класс HttpClient. Поскольку он показывает ошибку HttpClient not found, когда я пытаюсь инициализировать переменную «клиент».

  • Далее, в моем resp2, когда я пытаюсь получить «scrollId», он не распознает это. Как получить scrollId из resp1?

В общем, чего здесь не хватает?

Обновление 2:

Я изменил версию приведенных ниже зависимостей в соответствии с примером на github (примеры)

libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.3.3"

Код:

val client = ElasticClient(ElasticProperties("http://host:9200"))

Теперь я получаю следующую ошибку;

Ошибка:

Symbol 'type <none>.term.BuildableTermsQuery' is missing from the classpath.
[error] This symbol is required by 'method com.sksamuel.elastic4s.http.search.SearchHandlers.BuildableTermsNoOp'.
[error] Make sure that type BuildableTermsQuery is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'SearchHandlers.class' was compiled against an incompatible version of <none>.term.
[error]     val client = ElasticClient(ElasticProperties("host:9200"))
[error]                                                 ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

person Mihir    schedule 20.06.2019    source источник
comment
Вы смотрели тесты прокрутки в elastic4s? Другое решение — использовать search_after, который не требует использования определенных конечных точек и сохранения прокрутки.   -  person Pierre-Nicolas Mougel    schedule 21.06.2019
comment
Я обновляю свой вопрос более подробно.   -  person Mihir    schedule 21.06.2019


Ответы (1)


Лично я бы использовал Akka Streams для этого типа рабочего процесса, поскольку он упрощает параллельную обработку и построение рабочего процесса.

Справочная документация может быть немного объемной, но основная идея заключается в том, что вы начинаете с источника с одним выходом... проталкиваете его через любое количество потоков... затем собираете в приемнике.

Elastic4s поддерживает использование этого (почти) изначально, поэтому вам не нужно напрямую обрабатывать прокрутки и т. д.

Я не знаю, что ты хочешь делать со своими записями. Но создать источник для ваших данных будет примерно так:

import akka.stream.scaladsl.{GraphDSL, Sink, Source}

class MyIndexer(indexName:String) {
  def getIndexSource(client:ElasticClient)(implicit actorRefFactory: ActorRefFactory) = Source.fromPublisher(
    client.publisher(search(indexName) (your-query-here) sortByFieldAsc "originalSource.ctime" scroll "5m")
  )
}

вызов MyIndexer.getIndexSource вернет вам Source[SearchHit]. Затем вы можете преобразовать SearchHit в свой объект домена, однако вы обычно обрабатываете результат от Elastic4s (в моем случае с generic.auto Circe; так же, как при использовании непотокового интерфейса вы можете использовать .to[Domainobject]).

Вы, вероятно, задаетесь вопросом об этом ActorRefFactory имплицитном; это akka ActorSystem. Если вы работаете с чем-то вроде Play framework, вы можете получить это бесплатно, используя внедрение зависимостей для запроса экземпляра ActorSystem в любом внедренном классе (например, class MyClass @Inject() (implicit sys:ActorSystem)). Если вы используете простой Scala, вы можете сделать это в своей основной функции:

  private implicit val actorSystem = ActorSystem("some-name-here")
  private implicit val mat:Materializer = ActorMaterializer.create(actorSystem)

и используйте неявные параметры для передачи этих значений туда, где они необходимы.

Пример того, как использовать это для получения последовательности всех результатов (вероятно, не совсем то, что вам нужно, учитывая описание, но хороший пример) будет работать примерно так:

import com.sksamuel.elastic4s.circe._
import io.circe.generic.auto._

val source = indexer.getIndexSource(esclient)
val resultFuture = source
  .log("logger-name-here")
  .map(_.to[Yourdomainobject])
  .toMat(Sink.seq[Yourdomainobject])(Keep.right)
  .run()

resultFuture
  .map(resultSeq=>{ do stuff with result seq })
  .recover({
      case err:Throwable=>{handle error}
  })

Теперь, если вы хотите эффективно выполнять свою обработку, вам нужно реализовать ее как GraphStages и подключить ее к потоку здесь. Я реализовал кучу сканеров, которые работают с несколькими сотнями тысяч объектов, и каждый из них представляет собой не что иное, как функцию Main, которая настраивает и запускает поток, выполняющий всю фактическую обработку.

Я склонен разрабатывать свою логику в виде блок-схемы, а затем реализовывать каждый блок диаграммы как отдельный GraphStage akka, затем соединять их вместе и использовать встроенные элементы, такие как Broadcast и Merge, чтобы получить хорошую параллельную обработку.

Надеюсь, что это будет полезно!

person fredex42    schedule 21.11.2019
comment
О, и я бы порекомендовал сохранить все эти зависимости elastic4s одной и той же версии, чтобы вы не получали эти надоедливые ошибки загрузчика классов. Когда я возюсь с версиями библиотеки, мне иногда приходится делать sbt clean и удалять мой каталог target/scala-2.12, чтобы он не запутался. - person fredex42; 21.11.2019