Вставка аналитических данных из Spark в Postgres

У меня есть база данных Cassandra, из которой я проанализировал данные с помощью SparkSQL через Apache Spark. Теперь я хочу вставить эти проанализированные данные в PostgreSQL. Есть ли какие-либо способы добиться этого напрямую, кроме использования драйвера PostgreSQL (я добился этого с помощью postREST и Driver, я хочу знать, есть ли какие-либо методы, такие как saveToCassandra())?


person Justin    schedule 03.02.2015    source источник


Ответы (4)


На данный момент нет нативной реализации записи RDD ни к одной СУБД. Вот ссылки на соответствующие обсуждения в списке пользователей Spark: один, два

В общем, наиболее эффективным подходом будет следующий:

  1. Проверьте количество разделов в RDD, оно не должно быть ни слишком маленьким, ни слишком большим. 20-50 перегородок должно быть нормально, если число меньше - вызовите repartition с 20 перегородками, если больше - вызовите coalesce до 50 перегородок
  2. Вызовите преобразование mapPartition, внутри него вызовите функцию для вставки записей в вашу СУБД с помощью JDBC. В этой функции вы открываете соединение с вашей базой данных и используете команду COPY с этим API, это позволило бы вам исключить необходимость отдельной команды для каждой записи — таким образом вставка обрабатывалась бы намного быстрее

Таким образом, вы будете вставлять данные в Postgres параллельно, используя до 50 параллельных подключений (зависит от размера вашего кластера Spark и его конфигурации). Весь подход может быть реализован как функция Java/Scala, принимающая RDD и строку подключения.

person 0x0FFF    schedule 03.02.2015

Вы можете использовать API копирования Postgres, чтобы написать его, это намного быстрее. См. следующие два метода: один выполняет итерацию по RDD, чтобы заполнить буфер, который можно сохранить с помощью копирования API. Единственное, о чем вам нужно позаботиться, это создать правильный оператор в формате csv, который будет использоваться API-интерфейсом копирования.

def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
        val sb = mutable.StringBuilder.newBuilder
        val now = System.currentTimeMillis()

        rdd.collect().foreach(itr => {
            itr.foreach(_.createCSV(sb, now).append("\n"))
        })

        copyIn("myTable",  new StringReader(sb.toString), "statement")
        sb.clear
    }


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
        val conn = connectionPool.getConnection()
        try {
            conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => logWarning(se.getMessage)
            case t: Throwable => logWarning(t.getMessage)
        } finally {
            conn.close()
        }
    }
person smishra    schedule 26.05.2015
comment
не будет ли буфер sb StringBuilder неограниченно расти в соответствии с количеством записей в EventModel RDD? почему у тебя не кончается память? - person nont; 25.08.2015
comment
Я использую это для своего решения, которое работает уже несколько месяцев, и до сих пор я не видел, чтобы оно вышло из памяти. Объем данных, которые у меня есть, тоже весьма значителен - 100000/сек. Более того, если вас это беспокоит, вы всегда можете провести еще одну проверку, на основании которой вы вызываете copyIn и очищаете буфер. - person smishra; 09.09.2015

Ответ 0x0FFF хороший. Вот дополнительный момент, который был бы полезен.

Я использую foreachPartition для сохранения во внешнем хранилище. Это также соответствует шаблону проектирования Design Patterns for using foreachRDD, приведенному в документации Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

Пример:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
person jsr    schedule 22.03.2015

Приведенные выше ответы относятся к старым версиям spark, в spark 2.* есть коннектор jdbc, позволяющий записывать напрямую в RDBS из фрейма данных.

пример:

jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

person RELW    schedule 25.05.2019