Spark: подключение к БД для каждого раздела Spark RDD и выполнение mapPartition

Я хочу сделать mapPartitions на моем искре rdd,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })

Но это дает мне исключение уже закрытого соединения, как и ожидалось, потому что до того, как элемент управления достигнет .map(), мой connection закрыт. Я хочу создать соединение для каждого раздела RDD и закрыть его должным образом. Как я могу этого добиться?

Спасибо!


person void    schedule 17.06.2016    source источник
comment
Я никогда не пробовал этого, но я считаю, что вам следует создавать свои связи за пределами закрытия и, таким образом, закрывать после его завершения.   -  person eliasah    schedule 17.06.2016
comment
Это привело бы к единственному соединению. Я хочу одно соединение на раздел   -  person void    schedule 17.06.2016
comment
Я знаю, что это приводит к одной связи. Я считаю, что вам следует попробовать обычный подход к DBConnection.   -  person eliasah    schedule 17.06.2016
comment
какую базу данных вы используете? нельзя ли это решить с помощью пула соединений (открытие соединения с минимальным количеством соединений (может быть = количеству разделов), повторное использование соединения при его закрытии)? где, когда вы закрываете соединение, это не фактическое закрытие соединения, вместо этого оно возвращается в пул. в некоторых базах данных вы также можете проверить, закрыто ли соединение или нет через его api   -  person Ram Ghadiyaram    schedule 17.06.2016


Ответы (1)


Как упоминалось в обсуждении здесь, проблема связана с ленивостью работы карты на итераторе partition. Эта лень означает, что для каждого раздела создается и закрывается соединение, и только позже (когда выполняется действие RDD) вызывается readMatchingFromDB.

Чтобы решить эту проблему, вы должны принудительно пройти итератор перед закрытием соединения, например преобразовав его в список (а затем обратно):

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close()
  newPartition.iterator // create a new iterator
})
person Tzach Zohar    schedule 20.06.2016
comment
Можете ли вы предоставить мне реальный пример вышеупомянутой настройки ghe conndctikn и т. Д., Пожалуйста? Или указать куда-нибудь? Спасибо - person thebluephantom; 01.05.2018
comment
установка соединения, простите за орфографию на мобильном - person thebluephantom; 02.05.2018