Я построил распределенную модель потокового машинного обучения, используя модель актера akka. Обучение модели происходит асинхронно путем отправки экземпляра обучения (данные для обучения) актеру. Обучение на этих данных занимает время вычислений и изменяет состояние актера.
В настоящее время я использую исторические данные для обучения модели. Я хочу запустить несколько моделей с разной конфигурацией, обученных на одних и тех же данных, и посмотреть, как различаются разные метрики ансамбля. По сути, это гораздо менее сложная симуляция того, что у меня происходит с Thread.sleep(1) и массивом данных, представляющим время и состояние вычислений.
implicit val as = ActorSystem()
case object Report
case class Model(dataSize: Int) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Thread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val trainingData = Array.fill(5000)(Random.nextInt)
val dataSizeParams = (1 to 500)
Затем я использую цикл for для изменения параметров (представленных массивом dataSizeParams)
for {
param <- dataSizeParams
} {
// make model with params
val model = Model(param)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
}
Цикл for определенно НЕПРАВИЛЬНЫЙ СПОСОБ делать то, что я пытаюсь сделать. Он запускает все разные модели параллельно. Это хорошо работает, когда dataSizeParams находится в диапазоне от 1 до 500, но если я увеличу это до чего-то высокого, мои модели КАЖДАЯ начнут занимать заметные куски памяти. То, что я придумал, это код ниже. По сути, у меня есть Мастер моделей, который может контролировать количество одновременно работающих моделей на основе количества полученных им сообщений Run. Каждая модель теперь содержит ссылку на этого главного актера и отправляет ему сообщение, когда он закончит обработку:
// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run
case class Model(dataSize: Int, master: ActorRef) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Tread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
master ! ImDone
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val master: ActorRef = actor(new Act {
var paramRuns = dataSizeParams.toIterator
become {
case Run => {
if (paramRuns.hasNext) {
val model = Model(paramRuns.next(), self)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
} else {
println("No more to run")
context.stop(self)
}
}
case ImDone => {
self ! Run
}
}
})
master ! Run
С мастер-кодом (как я вижу) проблем нет. У меня есть жесткий контроль над количеством моделей, созданных за один раз, но я чувствую, что мне не хватает гораздо более простого/чистого/нестандартного способа сделать это. Также мне было интересно, есть ли какие-нибудь изящные способы ограничить количество одновременно работающих моделей, скажем, изучив использование ЦП и памяти системы.