Я пытаюсь сделать два внешних вызова (к базе данных Redis) внутри метода receive
Актера. Оба вызова возвращают Future
, и мне нужен результат первого Future
внутри второго. Я заключаю оба вызова в транзакцию Redis, чтобы никто другой не мог изменить значение в базе данных, пока я его читаю.
Внутреннее состояние актора обновляется на основе значения второго Future
.
Вот как выглядит мой текущий код, который я неверен, потому что я обновляю внутреннее состояние актера внутри обратного вызова Future.onComplete
.
Я не могу использовать шаблон PipeTo
, потому что мне нужно, чтобы оба Future были в транзакции. Если я использую Await
для первого Future
, мой метод получения будет блокировать. Любая идея, как это исправить?
Мой второй вопрос связан с тем, как я использую Future
s. Правильно ли это использование Future
s ниже? Есть ли лучший способ работы с несколькими фьючерсами в целом? Представьте, если бы было 3 или 4 Future, каждое из которых зависит от предыдущего.
import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient
import scala.concurrent.Future
import scala.util.{Failure, Success}
object GetSubscriptionsDemo extends App {
val akkaSystem = akka.actor.ActorSystem("redis-demo")
val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
actor ! UpdateState
}
case object UpdateState
class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
//mutable state that is updated on a periodic basis
var mutableState: Set[String] = Set.empty
//required by Future
implicit val ctx = context dispatcher
var rClient = RedisClient(ip)(context.system)
def receive = {
case UpdateState => {
log.info("Start of UpdateState ...")
val tran = rClient.transaction()
val zf: Future[Long] = tran.zcard(key) //FIRST Future
zf.onComplete {
case Success(z) => {
//SECOND Future, depends on result of FIRST Future
val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z)
rf.onComplete {
case Success(x) => {
//convert ByteString to UTF8 String
val v = x.map(_.utf8String)
log.info(s"Updating state with $v ")
//update actor's internal state inside callback for a Future
//IS THIS CORRECT ?
mutableState ++ v
}
case Failure(e) => {
log.warning("ZRANGE future failed ...", e)
}
}
}
case Failure(f) => log.warning("ZCARD future failed ...", f)
}
tran.exec()
}
}
}
Компилируется, но когда я запускаю, он срабатывает.
2014-08-07 INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379
ОБНОВЛЕНИЕ 1
Чтобы использовать шаблон pipeTo
, мне понадобится доступ к tran
и ПЕРВОМу будущему (zf
) в актере, куда я передаю Future
, потому что ВТОРОЕ Future
зависит от значения (z
) ПЕРВОГО.
//SECOND Future, depends on result of FIRST Future
val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z)