Как сгенерировать сложный XML с помощью Spark-Xml

Я пытаюсь сгенерировать сложный xml из моей книги JavaRDd‹ > и обзоров JavaRdd‹ > Как я могу объединить эти два для создания XML ниже?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>

Как видите, существует родительская корневая библиотека, в которой есть дочерние книги и обзоры.

Ниже показано, как я генерирую Book and Review Dataframe.

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);

Я знаю, как сгенерировать xml, и я сомневаюсь, в частности, в том, что у него есть корневой тег библиотеки и наличие книг и обзоров в качестве его дочернего элемента.

Я использую Java. но если бы вы могли указать мне правильно, вы можете написать пример Scala или Python.


comment
Это не сработает. Корневой тег прост (для этого есть опция), но вы не можете иметь разные типы объектов (разные схемы) в одном DataFrame.   -  person Alper t. Turker    schedule 16.04.2018
comment
@user9613318 user9613318, так должен ли я выполнить конкатенацию строк после создания двух разных xml? это единственный выход?   -  person Punith Raj    schedule 17.04.2018
comment
@ user9613318 не могли бы вы помочь мне с этим? stackoverflow.com/questions/50007809/   -  person Punith Raj    schedule 24.04.2018
comment
@ user9613318, не могли бы вы помочь мне с этим заголовком stackoverflow.com/questions/50131641/   -  person Punith Raj    schedule 02.05.2018


Ответы (1)


Это может быть не самый эффективный способ использования Spark для этого, но приведенный ниже код работает так, как вы хотите. (хотя Scala, потому что моя Java немного заржавела)

import java.io.{File, PrintWriter}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.Source

val spark = SparkSession.builder()
  .master("local[3]")
  .appName("test")
  .config("spark.driver.allowMultipleContexts", "true")
  .getOrCreate()

import spark.implicits._

/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)


val bookFrame = List(
  Book("book1"),
  Book("book2"),
  Book("book3"),
  Book("book4"),
  Book("book5")
).toDS()


val reviewFrame = List(
  Review(1),
  Review(2),
  Review(3),
  Review(4)
).toDS()

/* End test code **/

// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .mode(SaveMode.Overwrite)
  .save("/tmp/books/") // store to temp location

// Same for reviews
reviewFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .mode(SaveMode.Overwrite)
  .save("/tmp/review") // store to temp location


def concatFiles(path:String):List[String] = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .flatMap(file => Source.fromFile(file.getAbsolutePath).getLines()) 
    .map("    " + _) // prefix with spaces to allow for new root level xml
    .toList
}

val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml"){
  write(lines.mkString("\n"))
  close
}

Результат:

<xml>
<library>
    <books>
        <book>
            <author>book1</author>
        </book>
        <book>
            <author>book2</author>
        </book>
        <book>
            <author>book3</author>
        </book>
        <book>
            <author>book4</author>
        </book>
        <book>
            <author>book5</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
        <review>
            <id>2</id>
        </review>
        <review>
            <id>3</id>
        </review>
        <review>
            <id>4</id>
        </review>
    </reviews>
</library>

Другой подход может состоять в том, чтобы (использовать только искру) создать новый объект case class BookReview(books: List[Book], reviews: List[Review]) и сохранить его в xml после .collect() всех книг и обзоров в одном списке.

Хотя тогда я бы не использовал spark для обработки одной записи (BookReview), а использовал бы обычную библиотеку xml (например, xstream или около того) для хранения этого объекта.


Обновить. Методы concat списка не являются дружественными к памяти, поэтому использование потоков и буферов может быть решением вместо метода concatFiles.

def outputConcatFiles(path: String, outputFile: File): Unit = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .foreach(file => {
      val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))

      try {
        Stream.continually(reader.readLine())
          .takeWhile(_ != null)
          .foreach(line =>
            writer.write(s"    $line\n".getBytes)
          )
      } catch {
        case e: Exception => println(e.getMessage)
      } finally {
        writer.close()
        reader.close()
      }
    })
}

val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) { write("<xml>\n<library>\n"); close}
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) { append("</library>"); close}
person Tom Lous    schedule 17.04.2018
comment
Спасибо за ответ. У меня есть огромная коллекция записей, где-то около пятисот тысяч, поэтому я не могу взять ее в список. В текущей конфигурации он выдаст OutOfMemory... но я попробую конкатенацию, которую вы объяснили. - person Punith Raj; 17.04.2018
comment
Хорошая точка зрения. Я обновил свой ответ, чтобы разрешить FileStreams и т. д. - person Tom Lous; 17.04.2018
comment
не могли бы вы помочь мне с этим? stackoverflow.com/questions/50007809/ - person Punith Raj; 24.04.2018
comment
Том Лус stackoverflow.com/questions/50131641/, вы можете мне помочь с этим - person Punith Raj; 02.05.2018