Функциональная обработка курсора базы данных в Scala

Когда мне нужно прочитать миллионы строк базы данных из базы данных PostgreSQL с помощью драйвера JDBC, я всегда использую курсор, иначе я получу ошибку OutOfMemoryError. Вот шаблон (псевдокод), который я использую:

begin transaction
execute("declare cursor...")
while (true) {
  boolean processedSomeRows = false
  resultSet = executeQuery("fetch forward...")
  while (resultSet.next()) {
    processedSomeRows = true
    ...
  }
  if (!processedSomeRows) break
}
close cursor
commit

Это более «функциональный» эквивалент, который я придумал для реализации в Scala:

begin transaction
execute("declare cursor...")

@tailrec
def loop(resultSet: ResultSet,
         processed: Boolean): Boolean = {
  if (!resultSet.next()) processed
  else {
    // Process current result set row
    loop(resultSet, true)
  }
}

while (loop(executeQuery("fetch forward..."), false))
  ; //Empty loop

close cursor
commit

Я знаю, что это надумано, но есть ли лучший способ, не прибегая к изменчивости? Если бы я пытался сделать это на Haskell, я мог бы придумать решение, включающее монады, но я не хочу посылать свой разум по этим "извилистым маленьким проходам, похожим друг на друга", потому что он может никогда не вернуться...


person Ralph    schedule 08.10.2012    source источник
comment
Моя немедленная реакция заключается в том, что Iterator[ResultSet], передающий .map() поверх любой функции текущего набора результатов вашего процесса, будет частью более чистого решения ... по какой причине вы отвергли этот подход?   -  person timday    schedule 08.10.2012
comment
Обычно я использую либо Iterators, либо Streams для обработки ResultSets, но я не могу придумать способ остановить итерацию (аргумент processed выше).   -  person Ralph    schedule 09.10.2012
comment
Используйте takeWhile на итераторе, и он перестанет рисовать новые значения, когда любая функция, которую вы в него вставили, станет ложной. Действительно ли этот вопрос о том, как выполнить код очистки, когда итератор завершен с помощью ?   -  person timday    schedule 09.10.2012
comment
Мне придется подумать об этом (используя takeWhile), чтобы посмотреть, смогу ли я завершить внешний цикл while.   -  person Ralph    schedule 09.10.2012


Ответы (2)


Вот решение Scala, которое я придумал:

@tailrec
def processCursor(query: => ResultSet)(process: ResultSet => Unit) {
  @tailrec
  def loop(resultSet: ResultSet,
            processed: Boolean): Boolean = {
    if (!resultSet.next()) processed
    else {
      process
      loop(resultSet, true)
    }
  }
  if (loop(query, false)) processCursor(query)(process)
}

Назовите это так:

begin transaction
execute("declare cursor...")

processCursor(statement.executeQuery("fetch forward...")) {
  resultSet =>
  // process current row of the ResultSet
}

close cursor
commit

Как это можно улучшить?

person Ralph    schedule 09.10.2012

Альтернативный способ, основанный на ответе @Ralph:

  def processCursor[T](resultSet: ResultSet)(process: ResultSet => T) = {
    @tailrec
    def loop(seq: Seq[T], resultSet: ResultSet): Seq[T] = {
      if (resultSet.next()) loop(seq :+ process(resultSet), resultSet)
      else seq
    }
    loop(Seq.empty, resultSet)
  }
person Fede    schedule 03.07.2015