Множественные вызовы Future в методе получения Актера

Я пытаюсь сделать два внешних вызова (к базе данных Redis) внутри метода receive Актера. Оба вызова возвращают Future, и мне нужен результат первого Future внутри второго. Я заключаю оба вызова в транзакцию Redis, чтобы никто другой не мог изменить значение в базе данных, пока я его читаю.

Внутреннее состояние актора обновляется на основе значения второго Future.

Вот как выглядит мой текущий код, который я неверен, потому что я обновляю внутреннее состояние актера внутри обратного вызова Future.onComplete.

Я не могу использовать шаблон PipeTo, потому что мне нужно, чтобы оба Future были в транзакции. Если я использую Await для первого Future, мой метод получения будет блокировать. Любая идея, как это исправить?

Мой второй вопрос связан с тем, как я использую Futures. Правильно ли это использование Futures ниже? Есть ли лучший способ работы с несколькими фьючерсами в целом? Представьте, если бы было 3 или 4 Future, каждое из которых зависит от предыдущего.

import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient

import scala.concurrent.Future
import scala.util.{Failure, Success}


object GetSubscriptionsDemo extends App {
  val akkaSystem = akka.actor.ActorSystem("redis-demo")
  val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
  actor ! UpdateState
}

case object UpdateState

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  //required by Future
  implicit val ctx = context dispatcher

  var rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => {
      log.info("Start of UpdateState ...")

      val tran = rClient.transaction()

      val zf: Future[Long] = tran.zcard(key)  //FIRST Future 
      zf.onComplete {

        case Success(z) => {
          //SECOND Future, depends on result of FIRST Future 
          val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 
          rf.onComplete {
            case Success(x) => {
              //convert ByteString to UTF8 String
              val v = x.map(_.utf8String)
              log.info(s"Updating state with $v ")
              //update actor's internal state inside callback for a Future
              //IS THIS CORRECT ?
              mutableState ++ v
            }
            case Failure(e) => {
              log.warning("ZRANGE future failed ...", e)
            }
          }
        }
        case Failure(f) => log.warning("ZCARD future failed ...", f)
      }
      tran.exec()

    }
  }

}

Компилируется, но когда я запускаю, он срабатывает.

2014-08-07  INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379

ОБНОВЛЕНИЕ 1

Чтобы использовать шаблон pipeTo, мне понадобится доступ к tran и ПЕРВОМу будущему (zf) в актере, куда я передаю Future, потому что ВТОРОЕ Future зависит от значения (z) ПЕРВОГО.

    //SECOND Future, depends on result of FIRST Future 
      val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 

person Soumya Simanta    schedule 07.08.2014    source источник


Ответы (2)


Не зная слишком много о клиенте Redis, который вы используете, я могу предложить альтернативное решение, которое должно быть чище и не будет иметь проблем с закрытием изменяемого состояния. Идея состоит в том, чтобы использовать ситуацию типа мастер/рабочий, когда мастер (SimpleRedisActor) получает запрос на выполнение работы, а затем делегирует его рабочему процессу, который выполняет работу и отвечает состоянием для обновления. Это решение будет выглядеть примерно так:

object SimpleRedisActor{
  case object UpdateState
  def props(ip:String, key:String) = Props(classOf[SimpleRedisActor], ip, key)
}

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
  import SimpleRedisActor._
  import SimpleRedisWorker._

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  val rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => 
      log.info("Start of UpdateState ...")      
      val worker = context.actorOf(SimpleRedisWorker.props)
      worker ! DoWork(rClient, key)

    case WorkResult(result) =>
      mutableState ++ result

    case FailedWorkResult(ex) =>
      log.error("Worker got failed work result", ex)
  }
}

object SimpleRedisWorker{
  case class DoWork(client:RedisClient, key:String)
  case class WorkResult(result:Seq[String])
  case class FailedWorkResult(ex:Throwable)
  def props = Props[SimpleRedisWorker]
}

class SimpleRedisWorker extends Actor{
  import SimpleRedisWorker._
  import akka.pattern.pipe
  import context._

  def receive = {
    case DoWork(client, key) =>
      val trans = client.transaction()
      trans.zcard(key) pipeTo self
      become(waitingForZCard(sender, trans, key) orElse failureHandler(sender, trans))
  }

  def waitingForZCard(orig:ActorRef, trans:RedisTransaction, key:String):Receive = {      
    case l:Long =>
      trans.zrange(key, l -1, l) pipeTo self
      become(waitingForZRange(orig, trans) orElse failureHandler(orig, trans))
  }

  def waitingForZRange(orig:ActorRef, trans:RedisTransaction):Receive = {
    case s:Seq[ByteString] =>
      orig ! WorkResult(s.map(_.utf8String))
      finishAndStop(trans)
  }

  def failureHandler(orig:ActorRef, trans:RedisTransaction):Receive = {
    case Status.Failure(ex) => 
      orig ! FailedWorkResult(ex)
      finishAndStop(trans)   
  }

  def finishAndStop(trans:RedisTransaction) {
    trans.exec()
    context stop self
  }
}

Рабочий начинает транзакцию, затем вызывает Redis и в конечном итоге завершает транзакцию, прежде чем остановиться. Когда он вызывает redis, он получает будущее и передает его обратно самому себе для продолжения обработки, меняя метод получения между ними как механизм отображения прогресса в его состояниях. В такой модели (которая, как я полагаю, чем-то похожа на шаблон ядра ошибки) мастер владеет состоянием и защищает его, делегируя «рискованную» работу ребенку, который может понять, каким должно быть изменение состояния. но изменение по-прежнему принадлежит хозяину.

Опять же, я понятия не имею о возможностях клиента Redis, который вы используете, и достаточно ли он безопасен, чтобы делать такие вещи, но это не совсем так. Цель состояла в том, чтобы показать более безопасную структуру для выполнения чего-то подобного, включающего фьючерсы и состояния, которые необходимо безопасно изменить.

person cmbaxter    schedule 07.08.2014

Использование обратного вызова для изменения внутреннего состояния не является хорошей идеей, выдержка из документы akka:

При использовании будущих обратных вызовов, таких как onComplete, onSuccess и onFailure, внутри акторов необходимо тщательно избегать закрытия ссылки на содержащий актор, т. е. не вызывать методы или обращаться к изменяемому состоянию включающего актора из обратного вызова.

Почему вы беспокоитесь о pipeTo и транзакциях? Не уверен, как работают транзакции Redis, но я предполагаю, что транзакция в любом случае не включает обратный вызов onComplete во втором будущем.

Я бы вынес государство в отдельный актор, которому вы тоже трубите будущее. Таким образом, у вас есть отдельный почтовый ящик, и порядок в нем будет таким же, как и порядок сообщений, пришедших для изменения состояния. Кроме того, если поступают какие-либо запросы на чтение, они также будут помещены в правильном порядке.

Редактировать, чтобы ответить на отредактированный вопрос: Итак, вы не хотите передавать первое будущее, это имеет смысл и не должно быть проблемой, поскольку первый обратный вызов безвреден. Обратный вызов второго будущего представляет собой проблему, поскольку он манипулирует состоянием. Но это будущее может быть реализовано без необходимости доступа к транзакции.

Итак, в основном мое предложение:

val firstFuture = tran.zcard
firstFuture.onComplete {
   val secondFuture = tran.zrange
   secondFuture pipeTo stateActor
}

С stateActor, содержащим изменяемое состояние.

person pushy    schedule 07.08.2014
comment
Я обновил свой вопрос. Я понимаю, что не могу изменить состояние моего Актера внутри обратного вызова Future. См. ОБНОВЛЕНИЕ 1, чтобы получить представление о моей проблеме с использованием pipeTo - person Soumya Simanta; 07.08.2014