Apache Flink: обработка данных по порядку с помощью mapPartition

Я пробовал простую программу Flink, которая просто берет файл, переворачивает строки в файле и записывает его.

Программа работает, только отдельные строки выходят из строя.

E.g.

Ввод файла

Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido

Выходной файл

Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF

Я ожидал:

Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF

Ниже приведена программа, которую я написал для этого:

package testflink;

import java.util.Iterator;
import java.util.StringJoiner;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;

public class BatchJob {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        System.err.println(env.getParallelism());
        DataSource<String> file = env.readTextFile("./data.csv");
        file.mapPartition((Iterable<String> values, Collector<String> out) -> {
            System.err.println("************* " + out.hashCode() + " Begin");
            Iterator<String> iterator = values.iterator();
            while (iterator.hasNext()) {
                String tuple = iterator.next();
                System.err.println("************* " + out.hashCode() + tuple);
                String[] split = tuple.split(",");
                String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
                out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
            }
            System.err.println("************* " + out.hashCode() + " End");
        }).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
        env.execute("Flink Batch Java API Skeleton");
        System.out.println("Done");
    }
}
  • Можно ли сохранить порядок ввода? Есть ли хороший обходной путь?
  • Я знаю, что читаю csv и разбиваю строки, когда доступен метод readAsCsv(). Проблема в том, что CSV может иметь динамическое количество столбцов на строку/кортеж. Я не смог понять, как преобразовать его в DataSource с динамическим количеством столбцов на кортеж. MapPartition нуждается в определенных типах - как я могу заменить Tuple0 - Tuple25 во время выполнения?
  • И последний вопрос — могу ли я ограничить раздел, чтобы он никогда не принимал более n значений в параметре Iterable<String> values?

Заранее спасибо! :)


person Vineet    schedule 30.06.2017    source источник
comment
Помимо этого? :)   -  person Vineet    schedule 30.06.2017
comment
stackoverflow.com/questions/34071445/   -  person David Anderson    schedule 01.07.2017


Ответы (1)


mapPartition Flink поддерживает порядок записей в каждом параллельном разделе. Однако проблема в вашем случае использования заключается в том, как данные распределяются между параллельными задачами оператора MapPartition.

Вы используете TextInputFormat, который делит входной файл на несколько входных разделов, которые независимо обрабатываются параллельными экземплярами оператора источника данных. Каждый экземпляр источника данных локально пересылает все свои записи последующему оператору MapPartition, и это пересылает его записи результатов в приемник. Конвейер выглядит так:

source_1 -> mapPartition_1 -> sink_1
source_2 -> mapPartition_2 -> sink_2
source_3 -> mapPartition_3 -> sink_3
...

Итак, начиная с исходного кода, все записи обрабатываются по порядку. Однако, поскольку входные группы разделения назначаются исходным задачам случайным образом, а приемники работают независимо (без координации), выходные данные упорядочены лишь частично (записи, считанные из одной группы, упорядочены).

Установка параллелизма источника на 1 не поможет, потому что он будет отправлять свои записи результатов последующим задачам в циклическом режиме, чтобы использовать параллелизм последующего оператора. Также установка параллелизма всего задания на 1 не помогает, потому что разбиения все еще могут обрабатываться в случайном порядке одной исходной задачей. Единственное известное мне решение — пронумеровать каждую входную запись и сортировать по этому номеру (с разбиение диапазона для параллельной обработки) перед записью результата.

person Fabian Hueske    schedule 03.07.2017
comment
Теперь у меня есть csv, по которому я отсортировал. Первый столбец - это номера строк. Он сортируется лексикографически. Как я могу сделать это правильной числовой сортировкой? - person Vineet; 02.08.2017
comment
разделите строку на Tuple2<Integer, String>, где номер строки — целое число, а оставшаяся строка — строка. Затем вы можете отсортировать по полю Integer. - person Fabian Hueske; 02.08.2017