В среде scala spray, как я могу создать несколько клиентов http с разными конфигурациями (например, тайм-ауты, повторные попытки)

У меня есть два http-клиента спрея, например:

  val pipelineFoo: HttpRequest => Future[Foo] = (
    ~> unmarshal[Message.Foo])

  val pipelineBar: HttpRequest => Future[Bar] = (
    ~> unmarshal[Message.Bar])

  def execFoo(h: String, p: Int): Future[Foo] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/foo")

  def execBar(h: String, p: Int): Future[Bar] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/bar")

Я хотел бы, чтобы запрос foo повторялся несколько раз с большим тайм-аутом, а запрос бара не повторялся и имел короткий тайм-аут (скажем, 1 секунду). Как я могу добиться этого в спрее (извините, если это где-то в документации, но я не смог найти - я нашел только документацию по глобальной настройке таких параметров конфигурации).

Это не должно быть слишком сложно. sendReceive на самом деле может принимать больше параметров. Например, вот подпись для одного из вариантов:

def sendReceive(transport: ActorRef)(implicit ec: ExecutionContext, futureTimeout: Timeout): SendReceive

Я сам использую это для аналогичных сценариев, когда у меня должно быть большее количество повторных попыток и более длительные тайм-ауты, когда я обращаюсь к внешней службе, а не к одной из наших внутренних.

Вот пример моего конвейера, который я использую:

lazy val pipeline: HttpRequest => Future[HttpResponse] = (
addCredentials(BasicHttpCredentials(clientConnection.credentials._1, clientConnection.credentials._2))
  ~> addHeader(`User-Agent`(ProductVersion("<YOUR NAME HERE>", "<YOUR VERSION HERE>", "http://github.com/<WHEREVER YOUR PROJECT IS>"), ProductVersion("spray-client", "1.3.1", "http://spray.io")))
  ~> logRequest(log)
  ~> sendReceive(clientConnection.connection)(clientConnection.context, clientConnection.timeout)
  ~> decode(Deflate)
  ~> decode(Gzip)

В clientConnection нет ничего особенного. Это просто созданный мной класс case, который можно заполнить вручную с помощью кода или, возможно, какой-либо конфигурации в вашем application.conf.

@ Ларри Андерсон, ты прав. Я никогда не устанавливал количество повторных попыток в коде. Возможно, есть способ, но я обычно просто устанавливаю max-retries в spray.can.host-connector в пределах application.conf
Это не изменяет какой-либо из spray.can.host-connector программно, это просто явная установка тайм-аута для ActorRef.

2 года спустя, но, возможно, стоит для других людей. У нас была такая же потребность, и мы основывали наше решение на копировании/вставке файлов коннектора Spray.

import akka.actor.{ActorRef, ActorSystem}
import akka.io.IO
import akka.pattern.ask
import com.typesafe.config.Config
import spray.can.Http
import spray.can.Http.HostConnectorSetup
import spray.can.client.HostConnectorSettings
import spray.client.pipelining.sendReceive
import spray.http.Uri.Host
import spray.http.{HttpRequest, HttpResponse, Uri}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}

case class HttpCustomSettings(
  requestTimeout: Duration,
  maxRetries:     Int,
  maxConnections: Int

 * Implement a new HTTP client on top of akka IO and spray HTTP
 * to provide a way for caller to set client parameters on request basis instead
 * of globally in application.conf
 * This client defaults all its configuration with the one set in spray.conf
 * see spray.can.client and spray.can.host-connector
 * But you can override some of them on demand
 * - maxRetries
 * - requestTimeout
 * - maxConnections
class HttpClient(actorSystem: ActorSystem, config: Config) {
  private implicit val context: ActorSystem = actorSystem
  private implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher

  private val HTTP = "http"
  private val HTTPS = "https"

  private val defaultSettings: HostConnectorSettings =

  //not configurable since this timeout has little to no use practically
  //this timeout DOES NOT kill the open connection
  private implicit val clientFutureTimeout: akka.util.Timeout = 5.seconds

  def send(
    request:        HttpRequest,
    customSettings: Option[HttpCustomSettings] = None
  ): Future[HttpResponse] = {
    val pipeline: Future[HttpRequest ⇒ Future[HttpResponse]] =
      pipelineForUri(request.uri, customSettings)

    pipeline.flatMap(send ⇒ send(request))

   * To understand more this method
   * @see http://kamon.io/assets/img/diagrams/spray-client-actors.png
   * @see [[spray.can.HttpManager]]
   * @see [[spray.can.client.HttpHostConnector]]
   * @see [[spray.can.Http]]
  private def pipelineForUri(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): Future[HttpRequest ⇒ Future[HttpResponse]] = {
    for { 
      Http.HostConnectorInfo(connector, _) ← IO(Http) ? connectorSetup(uri, customSettings) 
} yield sendReceive(connector)

  private def connectorSetup(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): HostConnectorSetup = {
      uri.scheme == HTTP || uri.scheme == HTTPS,
      s"Not a valid $HTTP URI scheme: '${uri.scheme}' in '$uri'. (Did you forget $HTTP:// ?)"

    val connector: HostConnectorSetup = HostConnectorSetup(
      sslEncryption = uri.scheme == HTTPS

    customSettings match {
      case Some(custom) ⇒ connector.copy(settings = Option(mapCustomSettings(defaultSettings, custom)))
      case None         ⇒ connector.copy(settings = Option(defaultSettings))

  private def mapCustomSettings(
    settings:       HostConnectorSettings,
    customSettings: HttpCustomSettings
  ): HostConnectorSettings = {
      maxRetries = customSettings.maxRetries,
      maxConnections = customSettings.maxConnections,
      connectionSettings = settings.connectionSettings.copy(requestTimeout = customSettings.requestTimeout)

также может быть полезно переопределить цикл сбора данных, если для параметра requestTimeout установлено значение ‹250 мс.