Я пробовал простую программу Flink, которая просто берет файл, переворачивает строки в файле и записывает его.
Программа работает, только отдельные строки выходят из строя.
E.g.
Ввод файла
Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido
Выходной файл
Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF
Я ожидал:
Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF
Ниже приведена программа, которую я написал для этого:
package testflink;
import java.util.Iterator;
import java.util.StringJoiner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
System.err.println(env.getParallelism());
DataSource<String> file = env.readTextFile("./data.csv");
file.mapPartition((Iterable<String> values, Collector<String> out) -> {
System.err.println("************* " + out.hashCode() + " Begin");
Iterator<String> iterator = values.iterator();
while (iterator.hasNext()) {
String tuple = iterator.next();
System.err.println("************* " + out.hashCode() + tuple);
String[] split = tuple.split(",");
String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
}
System.err.println("************* " + out.hashCode() + " End");
}).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
env.execute("Flink Batch Java API Skeleton");
System.out.println("Done");
}
}
- Можно ли сохранить порядок ввода? Есть ли хороший обходной путь?
- Я знаю, что читаю csv и разбиваю строки, когда доступен метод
readAsCsv()
. Проблема в том, что CSV может иметь динамическое количество столбцов на строку/кортеж. Я не смог понять, как преобразовать его в DataSource с динамическим количеством столбцов на кортеж. MapPartition нуждается в определенных типах - как я могу заменитьTuple0
-Tuple25
во время выполнения? - И последний вопрос — могу ли я ограничить раздел, чтобы он никогда не принимал более n значений в параметре
Iterable<String> values
?
Заранее спасибо! :)