Я хочу сделать mapPartitions на моем искре rdd,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
Но это дает мне исключение уже закрытого соединения, как и ожидалось, потому что до того, как элемент управления достигнет .map()
, мой connection
закрыт. Я хочу создать соединение для каждого раздела RDD и закрыть его должным образом. Как я могу этого добиться?
Спасибо!