Akka ClusterSingletonProxy для удаленного развернутого синглтона

Я пытаюсь отправить сообщение одноэлементному актеру, который был развернут на удаленном узле через другого актера.

Это менеджер, который ожидает события memberUp, затем развертывает Worker актор на этом узле, а затем отправляет синглтону сообщение:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "worker",
        terminationMessage = End,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))))

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/singleton/worker",
        role = Some(s"worker")), "worker") ! "hello"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

Это рабочий:

object Worker extends App{
  ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

И application.conf:

manager {
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://[email protected]:2552"
      ]
      roles.1 = "manager"

    }
    remote.netty.tcp.port = 2552

  }
}

worker {
  akka {
    cluster {
      auto-down-unreachable-after = 20s
      seed-nodes = [
        "akka.tcp://[email protected]:2552"
      ]
      roles.1 = "worker"
    }
    remote.netty.tcp.port = 2554
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
  }
}

Рабочий процесс инициализирован (и я вижу в логах сообщение state change [Start -> Oldest]), но сообщение, отправленное менеджером, никогда не приходит рабочему процессу. Раньше он работал нормально, когда я развертывал синглтон на удаленном узле, но теперь я хочу, чтобы менеджер развернул его.

Я также попытался развернуть его как дочерний элемент менеджера (используя контекст вместо context.system) и изменил путь синглтона на user/manager/singleton/worker, но это не сработало.

Я использую Акка 2.3.11

Редактировать: файл sbt:

name := "MyProject"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies +=
    "com.typesafe.akka" %% "akka-actor" % "2.3.11",
    "com.typesafe.akka" %% "akka-cluster" % "2.3.11",
    "joda-time" % "joda-time" % "2.0",
    "com.typesafe.akka" %% "akka-contrib" % "2.3.11"

person user_s    schedule 27.06.2017    source источник
comment
Не могли бы вы опубликовать свой build.sbt? Легче воспроизвести без необходимости искать вместе все библиотеки.   -  person thwiegan    schedule 27.06.2017
comment
Я отредактировал свой пост с файлом sbt   -  person user_s    schedule 27.06.2017
comment
Спасибо, пакет для ClusterSingleton изменился в 2.5 и я не мог понять, что это было с 2.3.11   -  person thwiegan    schedule 27.06.2017
comment
Есть ли причина, по которой вы хотите удаленно развернуть синглтон кластера на рабочем месте?   -  person thwiegan    schedule 27.06.2017
comment
Не уверен, в чем вопрос. Если вы спрашиваете, почему мне нужен синглтон, а не обычный актор, то это потому, что в моем реальном приложении у меня есть другой менеджер, и я не хочу перезапускать двух воркеров (мне нужен только один воркер в кластере). Если вы спрашиваете, почему бы мне просто не перезапустить актор из нового процесса, то это потому, что менеджер также развертывает обычных акторов, поэтому было бы неплохо иметь логику только в одном месте - в менеджере.   -  person user_s    schedule 27.06.2017
comment
Отредактировал мой ответ, так как я заставил его работать после некоторых попыток.   -  person thwiegan    schedule 27.06.2017


Ответы (1)


Поэтому я немного поиграл с различными вариантами создания ClusterSingletonManager и думаю, что их удаленное развертывание нарушает что-то в шаблоне singleton. Для этого я собрал несколько индикаторов:

  • Поскольку это удаленное развертывание, путь ClusterSingletonManager на рабочем узле — /remote/akka.tcp/[email protected]:2552/user/worker. Я не думаю, что библиотека может/справится с этим, так как она ожидает /user/worker

  • При попытке отправить сообщение с главного узла с помощью ClusterSingletonProxy log в режиме DEBUG состояния No singleton available, stashing message hello worker и Trying to identify singleton at akka.tcp://[email protected]:2552/user/worker/singleton (сбой и повторная попытка) -> Он ищет синглтон на неправильном узле, так как нет доступного менеджера, и он очевидно, не знает, что синглтон находится на рабочем узле.

При создании ClusterSingletonManager непосредственно на рабочем узле все работает как положено.

У вас также возникла проблема с назначением менеджера. Ваш singletonName - это worker, а сам ваш менеджер (актер) не имеет никакого имени. При создании прокси вы используете путь /user/singleton/worker, но путь должен быть следующим: /user/{actorName}/{singletonName}. Поэтому в своем коде я использовал worker в качестве имени актера и singleton в качестве singletonName.

Итак, вот мой рабочий код:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"/user/worker/singleton",
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))

  sys.actorOf(ClusterSingletonManager.props(
    singletonProps = Props(classOf[Worker]),
    singletonName = "singleton",
    terminationMessage = PoisonPill,
    role = Some("worker")), name = "worker")
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}

application.conf и build.sbt остались прежними.

РЕДАКТИРОВАТЬ

Получил его для работы, сославшись на ClusterSingletonProxy с фактическим путем на рабочем узле (рассчитывая, что это сетевой путь). Я не уверен, что порекомендовал бы это, так как я все еще не уверен, предназначена ли эта библиотека для этого, но она работает, по крайней мере, в этом минимальном примере:

object Manager extends App {
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("manager"))
  sys.actorOf(Props[Manager], "manager")
}

class Manager extends Actor with ActorLogging {
  override def receive: Receive = {
    case MemberUp(member) if member.address != Cluster(context.system).selfAddress =>
      val ref = context.system.actorOf(ClusterSingletonManager.props(
        singletonProps = Props(classOf[Worker]),
        singletonName = "singleton",
        terminationMessage = PoisonPill,
        role = Some("worker")).withDeploy(Deploy(scope = RemoteScope(member.address))), name = "worker")

      context.actorOf(ClusterSingletonProxy.props(
        singletonPath = s"${ref.path.toStringWithoutAddress}/singleton", // /remote/akka.tcp/[email protected]:2552/user/worker/singleton
        role = Some("worker")), name = "workerProxy") ! "hello worker"
  }
  override def preStart(): Unit = {
    Cluster(context.system).subscribe(self,classOf[MemberUp])
  }
}

object Worker extends App{
  val sys = ActorSystem("mySys", ConfigFactory.load("application").getConfig("worker"))
}

class Worker extends Actor with ActorLogging {

  override def receive: Receive = {
    case msg =>
      println(s"GOT MSG : $msg from : ${sender().path.name}")
  }
}
person thwiegan    schedule 27.06.2017
comment
Спасибо - сработало. Однако это работает только для 1 актера. Если я попытаюсь добавить еще одного менеджера и другого работника, где менеджер2 развертывает второй синглтон-воркер2, затем запускает прокси-сервер и отправляет сообщение, сообщение не достигает работника1 (который является активным синглтоном). Странно, что они не хотели разрешать удаленное развертывание синглетонов, но разрешают вызывать withDeploy для реквизитов ClusterSingletonManager. Интересно, в новых версиях так же? - person user_s; 28.06.2017