Я новичок в искрах, и у меня есть проблема. Я обрабатываю RDD, созданный с помощью textFile(), который представляет собой файл csv. Для каждой строки я хочу вернуть несколько строк в новый RDD (одну, а не несколько). Это мой код:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
Что я здесь делаю, так это фильтрую исходный csv, чтобы получить только LinearAccelerationEvent, затем я хочу сопоставить эти объекты с классом LinearAccelerationEvent и сгенерировать новый RDD объектов LinearAccelerationEvent. Для каждой строки исходного CSV-файла мне нужно создать несколько объектов LinearAccelerometerEvent, но я не знаю, как это сделать. Причина, по которой я хочу это сделать, заключается в том, что позже этот RDD будет передан в cassandra следующим образом:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
Таким образом, идеальным решением будет что-то вроде:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
Я могу использовать функцию foreachPartition()
и отправлять каждое событие цикла for в Cassandra, но я видел, что этот подход намного медленнее. Возможно ли, чтобы пользователь foreach не делал то, что я хочу? Спасибо