Контроль появления актеров в Akka, которые потребляют заметное количество памяти

Я построил распределенную модель потокового машинного обучения, используя модель актера akka. Обучение модели происходит асинхронно путем отправки экземпляра обучения (данные для обучения) актеру. Обучение на этих данных занимает время вычислений и изменяет состояние актера.

В настоящее время я использую исторические данные для обучения модели. Я хочу запустить несколько моделей с разной конфигурацией, обученных на одних и тех же данных, и посмотреть, как различаются разные метрики ансамбля. По сути, это гораздо менее сложная симуляция того, что у меня происходит с Thread.sleep(1) и массивом данных, представляющим время и состояние вычислений.

implicit val as = ActorSystem()

case object Report

case class Model(dataSize: Int) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Thread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val trainingData = Array.fill(5000)(Random.nextInt)

val dataSizeParams = (1 to 500)

Затем я использую цикл for для изменения параметров (представленных массивом dataSizeParams)

for {
  param <- dataSizeParams
} {
  // make model with params
  val model = Model(param)
  for {
    trainingInstance <- trainingData
  } {
    model.train(trainingInstance)
  }
  model.report
}

Цикл for определенно НЕПРАВИЛЬНЫЙ СПОСОБ делать то, что я пытаюсь сделать. Он запускает все разные модели параллельно. Это хорошо работает, когда dataSizeParams находится в диапазоне от 1 до 500, но если я увеличу это до чего-то высокого, мои модели КАЖДАЯ начнут занимать заметные куски памяти. То, что я придумал, это код ниже. По сути, у меня есть Мастер моделей, который может контролировать количество одновременно работающих моделей на основе количества полученных им сообщений Run. Каждая модель теперь содержит ссылку на этого главного актера и отправляет ему сообщение, когда он закончит обработку:

// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run

case class Model(dataSize: Int, master: ActorRef) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Tread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          master ! ImDone
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val master: ActorRef = actor(new Act {
  var paramRuns = dataSizeParams.toIterator
  become {
    case Run => {
      if (paramRuns.hasNext) {
        val model = Model(paramRuns.next(), self)
        for {
          trainingInstance <- trainingData
        } {
          model.train(trainingInstance)
        }
        model.report
      } else {
        println("No more to run")
        context.stop(self)
      }
    }
    case ImDone =>  {
      self ! Run
    }
  }
})

master ! Run

С мастер-кодом (как я вижу) проблем нет. У меня есть жесткий контроль над количеством моделей, созданных за один раз, но я чувствую, что мне не хватает гораздо более простого/чистого/нестандартного способа сделать это. Также мне было интересно, есть ли какие-нибудь изящные способы ограничить количество одновременно работающих моделей, скажем, изучив использование ЦП и памяти системы.


person Andrew Cassidy    schedule 10.01.2014    source источник
comment
Использование роутера и пула актеров не для вас? Пул построителей моделей будет ограничен размером, установленным вами, и каждый построитель моделей пула не будет переходить к следующему сообщению в почтовом ящике до тех пор, пока не завершит построение своей текущей модели, сохраняя при этом больший контроль над памятью и процессором.   -  person cmbaxter    schedule 11.01.2014
comment
Это смешно. Мы пытались не управлять потоками вручную с помощью Akka, и в итоге мы управляли акторами... Я столкнулся с похожей проблемой. Кажется, что цикл for для создания актеров — действительно плохая идея.   -  person windweller    schedule 07.06.2014


Ответы (1)


Вы ищете модель рабочего подтягивания. Я настоятельно рекомендую этот пост в блоге от разработчиков Akka:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

Мы используем вариант этого в дополнение к функциям кластеризации Akka, чтобы избежать мошеннического параллелизма. Если рабочие субъекты вытягивают работу, а не супервизор отправляет, вы можете изящно контролировать объем работы (и, следовательно, использование ЦП и памяти), просто ограничивая количество рабочих актеров.

У этого есть несколько преимуществ по сравнению с чистыми маршрутизаторами: легче отслеживать сбои (как указано в этом посте), и работа не будет томиться в почтовом ящике (который потенциально может быть потерян).

Кроме того, если вы используете удаленное взаимодействие, я рекомендую вам не отправлять в сообщении большие объемы данных. Пусть рабочие узлы сами извлекают данные из другого источника при срабатывании. Мы используем S3.

person Ryan    schedule 11.01.2014