Карта Spark RDD от 1 до многих

Я новичок в искрах, и у меня есть проблема. Я обрабатываю RDD, созданный с помощью textFile(), который представляет собой файл csv. Для каждой строки я хочу вернуть несколько строк в новый RDD (одну, а не несколько). Это мой код:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
            new Function<String, Boolean>() {
                public Boolean call(String line) {
                    return line.contains("LinearAccelerationEvent");
                }
            }).map(
            new Function<String, LinearAccelerationEvent>() {
                public LinearAccelerationEvent call(String line) throws Exception {
                    String[] fields = line.split(",");
                    LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                    return linearAccelerationEvent;
                }
            }).cache();

Что я здесь делаю, так это фильтрую исходный csv, чтобы получить только LinearAccelerationEvent, затем я хочу сопоставить эти объекты с классом LinearAccelerationEvent и сгенерировать новый RDD объектов LinearAccelerationEvent. Для каждой строки исходного CSV-файла мне нужно создать несколько объектов LinearAccelerometerEvent, но я не знаю, как это сделать. Причина, по которой я хочу это сделать, заключается в том, что позже этот RDD будет передан в cassandra следующим образом:

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();

Таким образом, идеальным решением будет что-то вроде:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String line) {
                        return line.contains("LinearAccelerationEvent");
                    }
                }).map(
                new Function<String, LinearAccelerationEvent>() {
                    public LinearAccelerationEvent call(String line) throws Exception {
                        String[] fields = line.split(",");
                        for() {
                           LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
                           return linearAccelerationEvent;
                        }
                }
            }).cache();

Я могу использовать функцию foreachPartition() и отправлять каждое событие цикла for в Cassandra, но я видел, что этот подход намного медленнее. Возможно ли, чтобы пользователь foreach не делал то, что я хочу? Спасибо


person phcaze    schedule 29.11.2015    source источник


Ответы (1)


Если я вас правильно понимаю, верните коллекцию (например, список) LinearAccelerationEvent и вызовите flatMap вместо map. Это создаст значение в результирующем RDD для каждого события ускорения.

flatMap аналогичен вызову map с последующим flatten. Если вы знакомы с Hive, это похоже на использование DTF взрыва, доступного в HiveQL.

person ramblingpolak    schedule 29.11.2015
comment
Да, flatMap - это ответ, спасибо, чувак! Я опубликую решение для моего кода. - person phcaze; 29.11.2015