Многострочный ввод в Apache Spark с использованием java

Я просмотрел другие подобные вопросы, заданные уже на этом сайте, но не получил удовлетворительного ответа.

Я новичок в Apache Spark и Hadoop. Моя проблема в том, что у меня есть входной файл (35 ГБ), который содержит многострочные обзоры товаров на сайтах интернет-магазинов. Информация представлена ​​в файле, как показано ниже:

productId: C58500585F
product:  Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.

Это один блок обзора. Таких блоков, разделенных пустыми строками, тысячи. отсюда мне нужны productId, userId и оценка, поэтому я отфильтровал JavaRDD, чтобы иметь только те строки, которые мне нужны. поэтому это будет выглядеть следующим образом:

productId: C58500585F
userId: A3NM6WTIAE
score: 2.0

Код :

SparkConf conf = new SparkConf().setAppName("org.spark.program").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> input = context.textFile("path");

JavaRDD<String> requiredLines = input.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
if(s.contains("productId") ||  s.contains("UserId") || s.contains("score") ||  s.isEmpty() ) {
        return false;
    }
    return true;
}
});

Теперь мне нужно прочитать эти три строки как часть одной пары (ключ, значение), но я не знаю, как это сделать. Между двумя блоками отзывов будет только пустая строка.

Я просмотрел несколько сайтов, но не нашел решения своей проблемы. Может ли кто-нибудь помочь мне с этим? Большое спасибо! Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.


person Student    schedule 14.10.2016    source источник
comment
Вы пробовали поиграть с textinputformat.record.delimiter? Что-то вроде этого. Это позволит вам получить RDD, где каждая запись состоит из целого блока текста.   -  person Junjun Olympia    schedule 14.10.2016
comment
@Student Разделены ли поля блока (productid, product... и т. д.) каким-либо разделителем?   -  person Shankar    schedule 14.10.2016
comment
@Студент: Вам также нужно map, а не filter   -  person Shankar    schedule 14.10.2016
comment
@Shankar Нет, единственный разделитель, который у них есть, это то, что они находятся на отдельных строках. поэтому они разделены только разделителем новой строки и никаким другим специальным разделителем.   -  person Student    schedule 14.10.2016
comment
@Junjun Olympia Я изучил это, но, как я уже сказал в вопросе, специального разделителя нет. Блоки разделены только пустой строкой.   -  person Student    schedule 14.10.2016
comment
@Student: После JavaRDD<String> input = context.textFile("path"); строки вы можете попытаться выполнить foreach входной rdd и сообщить мне, печатает ли он весь блок как одну запись или каждая строка в блоке идет как одна запись?   -  person Shankar    schedule 14.10.2016
comment
@shankar о, хорошо, но как мне использовать карту?   -  person Student    schedule 14.10.2016


Ответы (1)


Продолжая мои предыдущие комментарии, здесь можно использовать textinputformat.record.delimiter. Если единственным разделителем является пустая строка, значение должно быть установлено на "\n\n".

Рассмотрим эти тестовые данные:

productId: C58500585F
product:  Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.

productId: ABCDEDFG
product:  Teddy Bear
product/price: 6.50
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Second comment.

productId: 12345689
product:  Hot Wheels
product/price: 12.00
userId: JJ
profileName: JJ
helpfulness: 1/1
score: 4.0
time: 1624609
summary: Summarized
text: Some text

Тогда код (в Scala) будет выглядеть примерно так:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n\n")
val raw = sc.newAPIHadoopFile("test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)

val data = raw.map(e => {
  val m = e._2.toString
    .split("\n")
    .map(_.split(":", 2))
    .filter(_.size == 2)
    .map(e => (e(0), e(1).trim))
    .toMap

  (m("productId"), m("userId"), m("score").toDouble)
})

Выход:

data.foreach(println)
(C58500585F,A3NM6WTIAE,2.0)
(ABCDEDFG,A3NM6WTIAE,2.0)
(12345689,JJ,4.0)

Не был уверен, что именно вы хотите для вывода, поэтому я просто превратил его в кортеж из трех элементов. Кроме того, логика синтаксического анализа определенно может быть сделана более эффективной, если вам это нужно, но это должно дать вам над чем поработать.

person Junjun Olympia    schedule 17.10.2016