Будущие Scala и Java, по-видимому, имеют неожиданные взаимодействия

Мы используем Elasticsearch 0.90.7 в нашем приложении Scala Play Framework, где конец нашего метода «doSearch» выглядит так:

def doSearch(...) = {
  ...
  val actionRequessBuilder: ActionRequestBuilder // constructed earlier in the method
  val executedFuture: ListenableActionFuture<Response> = actionRequestBuilder.execute
  return executedFuture.actionGet
}

где ListenableActionFuture расширяет java.util.concurrent.Future, а ListenableActionFuture#actionGet в основном совпадает с Future#get

Все это прекрасно работает, когда мы выполняем поиск последовательно, однако, когда мы пытаемся выполнить несколько поисков параллельно:

val search1 = scala.concurrent.Future(doSearch(...))
val search2 = scala.concurrent.Future(doSearch(...))
return Await.result(search1, defaultDuration) -> Await.result(search2, defaultDuration))

мы иногда (менее 1 или 2% времени) получаем неожиданные тайм-ауты на наших фьючерсах scala, даже при использовании чрезвычайно длительного тайм-аута во время qa (5 секунд, где поиск всегда выполняется менее чем за 200 мс). Это также происходит при использовании глобального контекста выполнения scala, а также при использовании контекста выполнения Play по умолчанию.

Происходит ли здесь какое-то неожиданное взаимодействие из-за того, что будущее java заключено в будущее Scala? Я бы подумал, что actionGet вызов java-фьючерса в конце doSearch предотвратит взаимодействие двух фьючерсов друг с другом, но, очевидно, это может быть не так.


person Zim-Zam O'Pootertoot    schedule 17.03.2014    source источник
comment
Может быть не в состоянии, но если вы можете написать тестовый пример, который другие могут запустить и воссоздать проблему, это может помочь в отладке.   -  person John Vint    schedule 17.03.2014
comment
@JohnVint Я пытался сделать именно это, но, к сожалению, низко висящие плоды (do while(i < 100) { i = i + 1; val f1 = Future(doSearch(...)); val f2 = Future(doSearch(...)); Await(f1, defaultDuration); Await(f2, defaultDuration)}) не вызывают проблемы; Я продолжу играть с этим, чтобы постоянно получать ошибку   -  person Zim-Zam O'Pootertoot    schedule 17.03.2014


Ответы (1)


Думал, где-то установлено, что блокировка - это зло. Зло!

В этом случае Await.result заблокирует текущий поток, поскольку он ожидает результата.

Await оборачивает вызов в blocking, пытаясь уведомить пул потоков о том, что он может захотеть увеличить количество потоков, чтобы сохранить желаемый параллелизм и избежать взаимоблокировок.

Если текущий поток не является Scala BlockContext, то вы получаете простую блокировку.

Какой бы ни была ваша точная конфигурация, предположительно вы держите поток, пока он заблокирован, и преобразователь, который вы запускаете для поиска, хочет что-то запустить и не может, потому что пул исчерпан.

Что имеет значение, так это то, какой пул создал текущий поток: не имеет значения, находится ли промежуточный объект Future в другом пуле, если, по сути, вам нужно использовать больше потоков из текущего пула, а он исчерпан.

Конечно, это всего лишь предположение.

Имеет больше смысла иметь одно будущее, которое получает значение из обоих поисков с тайм-аутом.

Но если вы столкнетесь с несколькими фьючерсами, имеет смысл использовать Future.sequence и подождать.

person som-snytt    schedule 17.03.2014