Использование списка в качестве значения в MapReduce возвращает идентичные значения

У меня есть задание MapReduce, которое выводит IntWritable в качестве ключа и объекта Point (объект, который я создал, который реализует возможность записи) в качестве значения из функции карты. Затем в функции сокращения я использую цикл for-each для прохождения итерации Points для создания списка:

@Override
public void reduce(IntWritable key, Iterable<Point> points, Context context) throws IOException, InterruptedException {

    List<Point> pointList = new ArrayList<>();
    for (Point point : points) {
        pointList.add(point);
    }
    context.write(key, pointList);
}

Проблема в том, что этот список имеет правильный размер, но каждая точка точно такая же. Поля в моем классе Point не являются статическими, и я распечатал каждую точку в цикле по отдельности, чтобы убедиться, что точки уникальны (а они есть). Кроме того, я создал отдельный класс, который просто создает пару точек и добавляет их в список, и это, кажется, работает, что подразумевает, что MapReduce делает что-то, о чем я не знаю.

Любая помощь в исправлении этого будет принята с благодарностью.

ОБНОВЛЕНИЕ: Код для класса Mapper:

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private IntWritable firstChar = new IntWritable();
private Point point = new Point();

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line, " ");

    while(tokenizer.hasMoreTokens()) {
        String atts = tokenizer.nextToken();
        String cut = atts.substring(1, atts.length() - 1);
        String[] nums = cut.split(",");

        point.set(Double.parseDouble(nums[0]), Double.parseDouble(nums[1]), Double.parseDouble(nums[2]), Double.parseDouble(nums[3]));
        context.write(one, point);
    }
}

Класс точки:

public class Point implements Writable {

public Double att1;
public Double att2;
public Double att3;
public Double att4;

public Point() {

}

public void set(Double att1, Double att2, Double att3, Double att4) {
    this.att1 = att1;
    this.att2 = att2;
    this.att3 = att3;
    this.att4 = att4;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeDouble(att1);
    dataOutput.writeDouble(att2);
    dataOutput.writeDouble(att3);
    dataOutput.writeDouble(att4);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
    this.att1 = dataInput.readDouble();
    this.att2 = dataInput.readDouble();
    this.att3 = dataInput.readDouble();
    this.att4 = dataInput.readDouble();
}

@Override
public String toString() {
    String output = "{" + att1 + ", " + att2 + ", " + att3 + ", " + att4 + "}";
    return output;
}

person user3354059    schedule 11.06.2015    source источник
comment
Пожалуйста, добавьте код для карты и уменьшите в соответствии с тем, как вы настраиваете карту и извлекаете в уменьшении. Также класс точек, который реализует Writable   -  person Ramzy    schedule 11.06.2015
comment
Только что обновил сообщение с классом Point и Mapper. Весь приведенный выше код — это все, что есть внутри каждого класса.   -  person user3354059    schedule 11.06.2015
comment
Попробуйте переместить точку point = new Point(); внутри карты и используя context.write(one, point); вне цикла while.   -  person Ramzy    schedule 11.06.2015


Ответы (1)


Проблема в вашем редукторе. Вы не хотите хранить все точки в памяти. Они могут быть, возможно, большими, и Hadoop решает это за вас (хотя и неуклюжим способом).

При циклическом просмотре заданного Iterable<Points> каждый экземпляр Point используется повторно, поэтому в данный момент времени поддерживается только один экземпляр.

Это означает, что при вызове points.next() произойдут следующие две вещи:

  1. Экземпляр Point используется повторно и устанавливается с данными следующей точки
  2. То же самое работает с экземпляром Key.

В вашем случае вы найдете в списке только один экземпляр Point, вставленный несколько раз и установленный с данными из последнего Point.

Вы не должны сохранять экземпляры Writables в редюсере или должны их клонировать.

Подробнее об этой проблеме можно прочитать здесь
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/

person vanekjar    schedule 11.06.2015
comment
Проблема в том, что когда я применяю это, я хочу сравнить каждую точку с другими точками в итерации, поэтому мне нужно иметь возможность хранить их и возвращаться к ним. Есть какой-либо способ сделать это? - person user3354059; 11.06.2015
comment
Вы не хотите хранить их в памяти. Как я уже сказал, MapReduce — это инструмент обработки больших данных — значения могут не помещаться в памяти. Как насчет использования точки в качестве ключа? Затем вы получите те же точки, сгруппированные и отсортированные в редюсере. - person vanekjar; 12.06.2015