Нов съм в spark и имам проблем. Обработвам RDD, генериран с textFile(), който е csv файл. За всеки ред искам да върна няколко реда към нов RDD (единичен, а не няколко). Това е моят код:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
Това, което правя тук, е да филтрирам първоначалния csv, за да получа само LinearAccelerationEvent, след което искам да съпоставя тези обекти към класа LinearAccelerationEvent и да генерирам нов RDD от обекти LinearAccelerationEvent. За всеки ред от първоначалния csv файл трябва да генерирам множество обекти LinearAccelerometerEvent, но не знам как да го направя. Причината, поради която искам да го направя, е, че по-късно този RDD ще бъде прехвърлен към cassandra по следния начин:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
Така че идеалното решение ще бъде нещо като:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
Мога да използвам функцията foreachPartition()
и да изпращам всяко събитие от for цикъла към Cassandra, но видях, че този подход е много по-бавен. Възможно ли е да не използвам foreach, за да правя това, което искам? Благодаря ти