Apache Flink: искаженное распределение данных в KeyedStream

У меня есть этот Java-код во Flink:

env.setParallelism(6);

//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);

//Filter half of the records 
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());

//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());

//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);

//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
                .keyBy(1)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new Reducer());

Проблема в том, что окно должно иметь возможность обрабатывать с parallelism = 2, поскольку есть две разные группы данных с ключами «odd» и «even» во второй String в Tuple3. Все работает с parallelism 6, но не с окном, которое работает с parallelism = 1, и мне просто нужно, чтобы он имел parallelism = 2 из-за моих требований.

В коде используются следующие функции:

public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
        return isOdd;
    }
};


public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
        return isEven;
    }
};

public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
        return newLine;
    }
};


public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
        return newLine;
    }
};

public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
            Tuple3<String, String, Integer> line2) throws Exception {
        Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
        Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
                " " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
        return newLine;
    }
};

Спасибо за вашу помощь!

РЕШЕНИЕ. Я изменил содержание ключей с «нечетное» и «четное» на «нечетное» и «четное1111», и теперь оно работает правильно.


person froblesmartin    schedule 22.06.2017    source источник


Ответы (1)


Ключи распределяются между рабочими потоками посредством хеш-секционирования. Это означает, что значения ключей хешируются, а поток определяется по модулю #workers. С двумя ключами и двумя потоками велика вероятность того, что оба ключа назначены одному и тому же потоку.

Вы можете попробовать использовать разные ключевые значения, хеш-значения которых распределяются по обоим потокам.

person Fabian Hueske    schedule 22.06.2017
comment
Спасибо!! Я изменил его с нечетного / четного на нечетный / четный1111, и теперь он работает: D. Единственное, что у меня есть два рабочих, и оба потока находятся на одной машине, есть ли способ заставить каждый поток помещать в разные рабочие? - person froblesmartin; 23.06.2017
comment
Это зависит от ваших настроек. Вы можете запускать воркеры с одним слотом, но в зависимости от того, используете ли вы YARN, Mesos или что-то еще, вы не можете контролировать, где будут запускаться воркеры. - person Fabian Hueske; 23.06.2017
comment
Я работаю в автономном режиме. Да, я подумал об одном слоте для задачи, но я хотел бы иметь некоторую производительность ... хахах, я оставлю его таким, но пропускная способность ограничена одним из двух рабочих :( - person froblesmartin; 23.06.2017