Функционална обработка на курсор на база данни в Scala

Когато трябва да прочета милиони редове от база данни от база данни на PostgreSQL, използвайки JDBC драйвера, винаги използвам курсор, в противен случай ще получа OutOfMemoryError. Ето шаблона (псевдокод), който използвам:

begin transaction
execute("declare cursor...")
while (true) {
  boolean processedSomeRows = false
  resultSet = executeQuery("fetch forward...")
  while (resultSet.next()) {
    processedSomeRows = true
    ...
  }
  if (!processedSomeRows) break
}
close cursor
commit

Това е по-„функционален“ еквивалент, който измислих, за да бъде внедрен в Scala:

begin transaction
execute("declare cursor...")

@tailrec
def loop(resultSet: ResultSet,
         processed: Boolean): Boolean = {
  if (!resultSet.next()) processed
  else {
    // Process current result set row
    loop(resultSet, true)
  }
}

while (loop(executeQuery("fetch forward..."), false))
  ; //Empty loop

close cursor
commit

Знам, че това е измислено, но има ли по-добър начин, без да се прибягва до променливост? Ако се опитвах да направя това в Haskell, можех да измисля решение, което включва монади, но не искам да изпращам ума си в тези „изкривени малки пасажи, всички еднакви“, защото може никога да не се върне...


person Ralph    schedule 08.10.2012    source източник
comment
Моята незабавна реакция е, че Iterator[ResultSet], захранващ .map() върху каквато и да е функцията за текущ набор от резултати на вашия процес, би бил част от по-чисто решение... някаква причина сте отхвърлили този подход?   -  person timday    schedule 08.10.2012
comment
Обикновено използвам Iterators или Streams за обработка на ResultSets, но не мога да измисля начин да спра итерацията (processed аргумент по-горе).   -  person Ralph    schedule 09.10.2012
comment
Използвайте takeWhile на итератора и той ще спре да изчертава нови стойности, когато каквато и функция да поставите в него стане невярно. Дали този въпрос всъщност е за това как да се изпълни код за почистване, когато итераторът приключи с?   -  person timday    schedule 09.10.2012
comment
Ще трябва да помисля за това (използвайки takeWhile), за да видя дали мога да прекратя външния while цикъл.   -  person Ralph    schedule 09.10.2012


Отговори (2)


Ето едно решение на Scala, което измислих:

@tailrec
def processCursor(query: => ResultSet)(process: ResultSet => Unit) {
  @tailrec
  def loop(resultSet: ResultSet,
            processed: Boolean): Boolean = {
    if (!resultSet.next()) processed
    else {
      process
      loop(resultSet, true)
    }
  }
  if (loop(query, false)) processCursor(query)(process)
}

Наречете го така:

begin transaction
execute("declare cursor...")

processCursor(statement.executeQuery("fetch forward...")) {
  resultSet =>
  // process current row of the ResultSet
}

close cursor
commit

Как може да се подобри това?

person Ralph    schedule 09.10.2012

Алтернативен начин, базиран на @Ralph отговор:

  def processCursor[T](resultSet: ResultSet)(process: ResultSet => T) = {
    @tailrec
    def loop(seq: Seq[T], resultSet: ResultSet): Seq[T] = {
      if (resultSet.next()) loop(seq :+ process(resultSet), resultSet)
      else seq
    }
    loop(Seq.empty, resultSet)
  }
person Fede    schedule 03.07.2015