Группировка элементов в Scala/Akka Streams

Предположим, у меня есть источник разных фруктов, и я хочу вставить их количество в базу данных.

Я могу сделать что-то вроде этого:

Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

Но это явно медленно — зачем вставлять в базу данных каждый элемент, когда я могу их сгруппировать? Поэтому я придумал лучшее решение:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

Но это означает, что я должен хранить 10 000 элементов [banana, orange, orange, orange, banana, ...] в памяти, пока они не будут сброшены в базу данных. Разве это не неэффективно? Возможно, я могу сделать что-то вроде этого:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

Насколько я понимаю, это также должно обрабатывать 10 000 элементов одновременно, но не должно занимать столько памяти (при условии, что элементы часто повторяются). Но каждая клавиша все равно повторяется в памяти 100 раз. Конечно, я могу сделать .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... Но нет ли лучшего способа? Возможно что-то вроде этого:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

Но не нарушает ли это концепцию потоков Akka, а именно независимость (и, следовательно, параллелизм) этапов обработки?


person Honza Zíka    schedule 21.08.2017    source источник
comment
Посмотрите на функцию groupedWithin. Он принимает два параметра: максимальное количество элементов и скорость. Например, .groupedWithnin(5000, 1.seconds) даст 5000 элементов для обработки, если вы достигли его до 1 секунды, или даст количество элементов, накопленных за 1 секунду.   -  person alifirat    schedule 21.08.2017
comment
Спасибо @alifirat за ваше предложение, но это просто другой способ группировки. Что мне нужно, так это другой способ обработки данных, которые у меня есть, как для памяти, так и для базы данных.   -  person Honza Zíka    schedule 21.08.2017


Ответы (1)


Если количество элементов набора Fruit низкое, вы можете сохранить единую карту со всеми счетчиками, а затем сбросить ее в базу данных после потоковой передачи всех значений Fruit.

Во-первых, создайте поток, который будет вести текущий счет:

type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount = 
  Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
  (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
  Flow[Fruit].scan(zeroCount)(appendFruitToCount)

Теперь создайте приемник, который будет получать последние FruitCount и материализовать поток:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] = 
  fruitSource
    .via(fruitCountFlow)
    .to(lastFruitCountSink)
    .run()

Затем lastFruitCountFut можно использовать для отправки значений в базу данных:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
  insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

Iterator используется, потому что это коллекция с наиболее эффективным использованием памяти для создания TraversableOnce элементов Fruit.

Это решение будет хранить в памяти только 1 Map, который будет иметь 1 ключ для каждого отдельного типа фруктов и 1 целое число для каждого ключа.

person Ramón J Romero y Vigil    schedule 22.08.2017