Spark RDD карта 1 към много

Нов съм в spark и имам проблем. Обработвам RDD, генериран с textFile(), който е csv файл. За всеки ред искам да върна няколко реда към нов RDD (единичен, а не няколко). Това е моят код:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
            new Function<String, Boolean>() {
                public Boolean call(String line) {
                    return line.contains("LinearAccelerationEvent");
                }
            }).map(
            new Function<String, LinearAccelerationEvent>() {
                public LinearAccelerationEvent call(String line) throws Exception {
                    String[] fields = line.split(",");
                    LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                    return linearAccelerationEvent;
                }
            }).cache();

Това, което правя тук, е да филтрирам първоначалния csv, за да получа само LinearAccelerationEvent, след което искам да съпоставя тези обекти към класа LinearAccelerationEvent и да генерирам нов RDD от обекти LinearAccelerationEvent. За всеки ред от първоначалния csv файл трябва да генерирам множество обекти LinearAccelerometerEvent, но не знам как да го направя. Причината, поради която искам да го направя, е, че по-късно този RDD ще бъде прехвърлен към cassandra по следния начин:

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();

Така че идеалното решение ще бъде нещо като:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String line) {
                        return line.contains("LinearAccelerationEvent");
                    }
                }).map(
                new Function<String, LinearAccelerationEvent>() {
                    public LinearAccelerationEvent call(String line) throws Exception {
                        String[] fields = line.split(",");
                        for() {
                           LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                           return linearAccelerationEvent;
                        }
                }
            }).cache();

Мога да използвам функцията foreachPartition() и да изпращам всяко събитие от for цикъла към Cassandra, но видях, че този подход е много по-бавен. Възможно ли е да не използвам foreach, за да правя това, което искам? Благодаря ти


person phcaze    schedule 29.11.2015    source източник


Отговори (1)


Ако ви разбирам правилно, върнете колекция (напр. списък) от LinearAccelerationEvent и извикайте flatMap вместо map. Това ще създаде стойност в получената RDD за всяко събитие на ускорение.

flatMap е същото като извикването на map, последвано от flatten. Ако сте запознати с Hive, това е подобно на използването на експлозивния DTF, наличен в HiveQL.

person ramblingpolak    schedule 29.11.2015
comment
Да, flatMap е отговорът, благодаря, човече! Ще публикувам решението на моя код. - person phcaze; 29.11.2015