В среде scala spray, как я могу создать несколько клиентов http с разными конфигурациями (например, тайм-ауты, повторные попытки)

У меня есть два http-клиента спрея, например:

  val pipelineFoo: HttpRequest => Future[Foo] = (
    sendReceive
    ~> unmarshal[Message.Foo])

  val pipelineBar: HttpRequest => Future[Bar] = (
    sendReceive
    ~> unmarshal[Message.Bar])

  def execFoo(h: String, p: Int): Future[Foo] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/foo")
    pipelineFoo(Get(uri))
  }

  def execBar(h: String, p: Int): Future[Bar] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/bar")
    pipelineBar(Get(uri))
  }

Я хотел бы, чтобы запрос foo повторялся несколько раз с большим тайм-аутом, а запрос бара не повторялся и имел короткий тайм-аут (скажем, 1 секунду). Как я могу добиться этого в спрее (извините, если это где-то в документации, но я не смог найти - я нашел только документацию по глобальной настройке таких параметров конфигурации).


person jonderry    schedule 16.06.2014    source источник


Ответы (2)


Это не должно быть слишком сложно. sendReceive на самом деле может принимать больше параметров. Например, вот подпись для одного из вариантов:

def sendReceive(transport: ActorRef)(implicit ec: ExecutionContext, futureTimeout: Timeout): SendReceive

Я сам использую это для аналогичных сценариев, когда у меня должно быть большее количество повторных попыток и более длительные тайм-ауты, когда я обращаюсь к внешней службе, а не к одной из наших внутренних.

Вот пример моего конвейера, который я использую:

lazy val pipeline: HttpRequest => Future[HttpResponse] = (
addCredentials(BasicHttpCredentials(clientConnection.credentials._1, clientConnection.credentials._2))
  ~> addHeader(`User-Agent`(ProductVersion("<YOUR NAME HERE>", "<YOUR VERSION HERE>", "http://github.com/<WHEREVER YOUR PROJECT IS>"), ProductVersion("spray-client", "1.3.1", "http://spray.io")))
  ~> logRequest(log)
  ~> sendReceive(clientConnection.connection)(clientConnection.context, clientConnection.timeout)
  ~> decode(Deflate)
  ~> decode(Gzip)
)

В clientConnection нет ничего особенного. Это просто созданный мной класс case, который можно заполнить вручную с помощью кода или, возможно, какой-либо конфигурации в вашем application.conf.

person Adrian Rodriguez    schedule 24.06.2014
comment
@ Ларри Андерсон, ты прав. Я никогда не устанавливал количество повторных попыток в коде. Возможно, есть способ, но я обычно просто устанавливаю max-retries в spray.can.host-connector в пределах application.conf - person Adrian Rodriguez; 14.01.2015
comment
Это не изменяет какой-либо из spray.can.host-connector программно, это просто явная установка тайм-аута для ActorRef. - person Sparko; 25.08.2015

2 года спустя, но, возможно, стоит для других людей. У нас была такая же потребность, и мы основывали наше решение на копировании/вставке файлов коннектора Spray.

import akka.actor.{ActorRef, ActorSystem}
import akka.io.IO
import akka.pattern.ask
import com.typesafe.config.Config
import spray.can.Http
import spray.can.Http.HostConnectorSetup
import spray.can.client.HostConnectorSettings
import spray.client.pipelining.sendReceive
import spray.http.Uri.Host
import spray.http.{HttpRequest, HttpResponse, Uri}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}

case class HttpCustomSettings(
  requestTimeout: Duration,
  maxRetries:     Int,
  maxConnections: Int
)

/**
 * Implement a new HTTP client on top of akka IO and spray HTTP
 * to provide a way for caller to set client parameters on request basis instead
 * of globally in application.conf
 *
 * This client defaults all its configuration with the one set in spray.conf
 * see spray.can.client and spray.can.host-connector
 * But you can override some of them on demand
 * - maxRetries
 * - requestTimeout
 * - maxConnections
 */
class HttpClient(actorSystem: ActorSystem, config: Config) {
  private implicit val context: ActorSystem = actorSystem
  private implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher

  private val HTTP = "http"
  private val HTTPS = "https"

  private val defaultSettings: HostConnectorSettings =
    HostConnectorSettings.fromSubConfig(config.getConfig("spray.can"))

  //not configurable since this timeout has little to no use practically
  //this timeout DOES NOT kill the open connection
  //http://kamon.io/teamblog/2014/11/02/understanding-spray-client-timeout-settings/
  private implicit val clientFutureTimeout: akka.util.Timeout = 5.seconds

  def send(
    request:        HttpRequest,
    customSettings: Option[HttpCustomSettings] = None
  ): Future[HttpResponse] = {
    val pipeline: Future[HttpRequest ⇒ Future[HttpResponse]] =
      pipelineForUri(request.uri, customSettings)

    pipeline.flatMap(send ⇒ send(request))
  }

  /**
   * To understand more this method
   * @see http://kamon.io/assets/img/diagrams/spray-client-actors.png
   * @see [[spray.can.HttpManager]]
   * @see [[spray.can.client.HttpHostConnector]]
   * @see [[spray.can.Http]]
   */
  private def pipelineForUri(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): Future[HttpRequest ⇒ Future[HttpResponse]] = {
    for { 
      Http.HostConnectorInfo(connector, _) ← IO(Http) ? connectorSetup(uri, customSettings) 
} yield sendReceive(connector)
  }

  private def connectorSetup(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): HostConnectorSetup = {
    require(
      uri.scheme == HTTP || uri.scheme == HTTPS,
      s"Not a valid $HTTP URI scheme: '${uri.scheme}' in '$uri'. (Did you forget $HTTP:// ?)"
    )

    val connector: HostConnectorSetup = HostConnectorSetup(
      uri.authority.host.toString,
      uri.effectivePort,
      sslEncryption = uri.scheme == HTTPS
    )

    customSettings match {
      case Some(custom) ⇒ connector.copy(settings = Option(mapCustomSettings(defaultSettings, custom)))
      case None         ⇒ connector.copy(settings = Option(defaultSettings))
    }
  }

  private def mapCustomSettings(
    settings:       HostConnectorSettings,
    customSettings: HttpCustomSettings
  ): HostConnectorSettings = {
    settings.copy(
      maxRetries = customSettings.maxRetries,
      maxConnections = customSettings.maxConnections,
      connectionSettings = settings.connectionSettings.copy(requestTimeout = customSettings.requestTimeout)
    )
  }

}
person AlixB    schedule 23.12.2015
comment
также может быть полезно переопределить цикл сбора данных, если для параметра requestTimeout установлено значение ‹250 мс. - person AlixB; 02.03.2016