Как мога да дросел съобщения до IO(Tcp) актьора в Akka

Имам такъв актьор

class TcpClientActor(target: Target) extends Actor with Logger {

  override def preStart(): Unit = {
    self ! TestConnection
  }

  override def receive: Receive = {
    case TestConnection =>
      IO(Tcp) ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)

    case failed@CommandFailed(_: Connect) =>
      info(s"Failure: $target.endpoint:$target.port")
      shutdown()

    case Connected(_, _) =>
      info(s"Success: $target.endpoint:$target.port")
      sender() ! Close
      shutdown()

  }

  def shutdown(): Unit = {
    context stop self
  }
}

Повтарям файл с крайни точки за тестване и създавам един от тези актьори с всеки ред като аргумент на конструктора от тип Target. Искам да мога да намаля броя на паралелните TCP връзки за стартиране до някакъв зададен брой, има ли вградени механизми, които мога да използвам в Akka, за да гарантирам, че няма да претоварвам системата, като просто незабавно създам TcpClientActor за всеки ред от въвеждане и стартиране на връзка с гнездо?


person conorgriffin    schedule 21.03.2019    source източник
comment
doc.akka.io/docs/akka/2.4/contrib/throttle. html   -  person Robert Harvey    schedule 21.03.2019


Отговори (1)


Бих използвал Akka Stream, за да дросел съобщенията

import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.{ ActorMaterializer, OverflowStrategy, ThrottleMode }
import akka.stream.scaladsl.{ Sink, Source }

object TcpThrottle {
  def throttler(ratePerSecond: Int, burstRate: Option[Int], bufferSize: Int = 1000)(implicit materializer: ActorMaterializer): ActorRef =
    Source.actorRef(bufferSize = bufferSize, OverflowStrategy.dropNew)
      .throttle(ratePerSecond, 1.second, burstRate.getOrElse(ratePerSecond), ThrottleMode.Shaping)
      .to(Sink.actorRef(IO(Tcp), NotUsed)
      .run()
 }

class TcpClientActor(target: Target) extends Actor with Logger {
  val throttler = TcpThrottle.throttler(1, Some(5))

  // otherwise identical

  def receive: Receive = {
    case TestConnection => throttler ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)

    // other cases identical
  }
}

Адаптирано от Ръководството за миграция на Akka 2.5. Може да е необходимо

person Levi Ramsey    schedule 28.03.2019