У меня есть база данных Cassandra, из которой я проанализировал данные с помощью SparkSQL через Apache Spark. Теперь я хочу вставить эти проанализированные данные в PostgreSQL. Есть ли какие-либо способы добиться этого напрямую, кроме использования драйвера PostgreSQL (я добился этого с помощью postREST и Driver, я хочу знать, есть ли какие-либо методы, такие как saveToCassandra()
)?
Вставка аналитических данных из Spark в Postgres
Ответы (4)
На данный момент нет нативной реализации записи RDD ни к одной СУБД. Вот ссылки на соответствующие обсуждения в списке пользователей Spark: один, два
В общем, наиболее эффективным подходом будет следующий:
- Проверьте количество разделов в RDD, оно не должно быть ни слишком маленьким, ни слишком большим. 20-50 перегородок должно быть нормально, если число меньше - вызовите
repartition
с 20 перегородками, если больше - вызовитеcoalesce
до 50 перегородок - Вызовите преобразование
mapPartition
, внутри него вызовите функцию для вставки записей в вашу СУБД с помощью JDBC. В этой функции вы открываете соединение с вашей базой данных и используете команду COPY с этим API, это позволило бы вам исключить необходимость отдельной команды для каждой записи — таким образом вставка обрабатывалась бы намного быстрее
Таким образом, вы будете вставлять данные в Postgres параллельно, используя до 50 параллельных подключений (зависит от размера вашего кластера Spark и его конфигурации). Весь подход может быть реализован как функция Java/Scala, принимающая RDD и строку подключения.
Вы можете использовать API копирования Postgres, чтобы написать его, это намного быстрее. См. следующие два метода: один выполняет итерацию по RDD, чтобы заполнить буфер, который можно сохранить с помощью копирования API. Единственное, о чем вам нужно позаботиться, это создать правильный оператор в формате csv, который будет использоваться API-интерфейсом копирования.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
Ответ 0x0FFF хороший. Вот дополнительный момент, который был бы полезен.
Я использую foreachPartition
для сохранения во внешнем хранилище. Это также соответствует шаблону проектирования Design Patterns for using foreachRDD
, приведенному в документации Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams
Пример:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Приведенные выше ответы относятся к старым версиям spark, в spark 2.* есть коннектор jdbc, позволяющий записывать напрямую в RDBS из фрейма данных.
пример:
jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html