Групиране на елементи в Scala / Akka Streams

Да предположим, че имам източник на различни плодове и искам да вмъкна техния брой в база данни.

Мога да направя нещо подобно:

Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

Но това очевидно е бавно – защо да вмъквам в база данни с всеки елемент, когато мога да ги групирам? Така че измислих по-добро решение:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

Но това означава, че трябва да държа 10 000 елемента [banana, orange, orange, orange, banana, ...] в паметта, докато не бъдат изхвърлени в базата данни. Това не е ли неефективно? Може би мога да направя нещо подобно:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

Доколкото разбирам, това също трябва да обработва 10 000 елемента наведнъж, но не трябва да заема толкова много памет (при условие, че елементите се повтарят често). Но всеки клавиш все още се повтаря 100 пъти в паметта. Разбира се, че мога да направя .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... Но няма ли по-добър начин? Може би нещо подобно:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

Но не нарушава ли концепцията на потоците Akka, а именно независимостта (и следователно едновременността) на етапите на обработка?


person Honza Zíka    schedule 21.08.2017    source източник
comment
Разгледайте функцията groupedWithin. Отнема два параметъра: максимална граница на елементите и времева скорост. Например .groupedWithnin(5000, 1.seconds) ще даде 5000 елемента за обработка, ако сте го достигнали преди 1 секунда или ще даде броя елементи, натрупани за 1 секунда.   -  person alifirat    schedule 21.08.2017
comment
Благодаря на @alifirat за твоето предложение, но това е просто различен начин за групиране. Това, от което се нуждая, е различен начин за обработка на данните, които имам, удобен както за паметта, така и за базата данни.   -  person Honza Zíka    schedule 21.08.2017


Отговори (1)


Ако кардиналността на набора Fruit е ниска, тогава можете да запазите единствена карта с всички преброявания и след това да я изчистите в базата данни след поточно предаване през всички стойности на Fruit.

Първо, изградете поток, който ще поддържа текущия брой:

type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount = 
  Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
  (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
  Flow[Fruit].scan(zeroCount)(appendFruitToCount)

Сега създайте Sink, който ще получи последните FruitCount и ще материализира потока:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] = 
  fruitSource
    .via(fruitCountFlow)
    .to(lastFruitCountSink)
    .run()

След това lastFruitCountFut може да се използва за изпращане на стойности към базата данни:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
  insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

Iterator се използва, защото е най-ефективната колекция от памет за конструиране на TraversableOnce от елементи Fruit.

Това решение ще запази само 1 Map в паметта, което ще има 1 ключ за всеки отделен тип плод и 1 цяло число за всеки ключ.

person Ramón J Romero y Vigil    schedule 22.08.2017