Предположим, у меня есть источник разных фруктов, и я хочу вставить их количество в базу данных.
Я могу сделать что-то вроде этого:
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
Но это явно медленно — зачем вставлять в базу данных каждый элемент, когда я могу их сгруппировать? Поэтому я придумал лучшее решение:
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}
Но это означает, что я должен хранить 10 000 элементов [banana, orange, orange, orange, banana, ...]
в памяти, пока они не будут сброшены в базу данных. Разве это не неэффективно? Возможно, я могу сделать что-то вроде этого:
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}
Насколько я понимаю, это также должно обрабатывать 10 000 элементов одновременно, но не должно занимать столько памяти (при условии, что элементы часто повторяются). Но каждая клавиша все равно повторяется в памяти 100 раз. Конечно, я могу сделать .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()
... Но нет ли лучшего способа? Возможно что-то вроде этого:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}
Но не нарушает ли это концепцию потоков Akka, а именно независимость (и, следовательно, параллелизм) этапов обработки?
groupedWithin
. Он принимает два параметра: максимальное количество элементов и скорость. Например,.groupedWithnin(5000, 1.seconds)
даст 5000 элементов для обработки, если вы достигли его до 1 секунды, или даст количество элементов, накопленных за 1 секунду. - person alifirat   schedule 21.08.2017