Да предположим, че имам източник на различни плодове и искам да вмъкна техния брой в база данни.
Мога да направя нещо подобно:
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
Но това очевидно е бавно – защо да вмъквам в база данни с всеки елемент, когато мога да ги групирам? Така че измислих по-добро решение:
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}
Но това означава, че трябва да държа 10 000 елемента [banana, orange, orange, orange, banana, ...]
в паметта, докато не бъдат изхвърлени в базата данни. Това не е ли неефективно? Може би мога да направя нещо подобно:
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}
Доколкото разбирам, това също трябва да обработва 10 000 елемента наведнъж, но не трябва да заема толкова много памет (при условие, че елементите се повтарят често). Но всеки клавиш все още се повтаря 100 пъти в паметта. Разбира се, че мога да направя .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()
... Но няма ли по-добър начин? Може би нещо подобно:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}
Но не нарушава ли концепцията на потоците Akka, а именно независимостта (и следователно едновременността) на етапите на обработка?
groupedWithin
. Отнема два параметъра: максимална граница на елементите и времева скорост. Например.groupedWithnin(5000, 1.seconds)
ще даде 5000 елемента за обработка, ако сте го достигнали преди 1 секунда или ще даде броя елементи, натрупани за 1 секунда. - person alifirat   schedule 21.08.2017