Рассчитать среднее значение и стандартное отклонение по столбцам в Hadoop

Я хочу рассчитать средние значения и стандартное отклонение по столбцам в Hadoop.

Я просто использую однопроходный алгоритм Naïve для MapReduce. Я протестировал его на многомерных наборах данных 455000x90 и 650000x120 и получил ускорение ниже, еще ниже, чем количество процессоров. Для автономного и псевдораспределенного режима с 2 активными ядрами я получил ускорение 0,4 = 20 секунд / 53 секунды для 455000x90.

Почему моя программа не эффективна? Можно ли его улучшить?

Картограф:

public class CalculateMeanAndSTDEVMapper extends
       Mapper <LongWritable,
               DoubleArrayWritable,
               IntWritable,
               DoubleArrayWritable> {

    private int dataDimFrom;
    private int dataDimTo;
    private long samplesCount;
    private int universeSize;

@Override
protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    dataDimFrom = conf.getInt("dataDimFrom", 0);
    dataDimTo = conf.getInt("dataDimTo", 0);
    samplesCount = conf.getLong("samplesCount", 0);
    universeSize = dataDimTo - dataDimFrom + 1;
}

@Override
public void map(
        LongWritable key,
        DoubleArrayWritable array,
        Context context) throws IOException, InterruptedException {
    DoubleWritable[] outArray = new DoubleWritable[universeSize*2];
    for (int c = 0; c < universeSize; c++) {
        outArray[c] = new DoubleWritable(
                         array.get(c+dataDimFrom).get() / samplesCount);
    }
    for (int c = universeSize; c < universeSize*2; c++) {
        double val = array.get(c-universeSize+dataDimFrom).get();
        outArray[c] = new DoubleWritable((val*val) / samplesCount);
    }
    context.write(new IntWritable(1), new DoubleArrayWritable(outArray));
}

}

Комбинатор:

public class CalculateMeanAndSTDEVCombiner extends
       Reducer <IntWritable,
                DoubleArrayWritable,
                IntWritable,
                DoubleArrayWritable> {

   private int dataDimFrom;
   private int dataDimTo;
   private int universeSize;

@Override
protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    dataDimFrom = conf.getInt("dataDimFrom", 0);
    dataDimTo = conf.getInt("dataDimTo", 0);
    universeSize = dataDimTo - dataDimFrom + 1;
}

@Override
public void reduce(
        IntWritable column,
        Iterable<DoubleArrayWritable> partialSums,
        Context context) throws IOException, InterruptedException {
    DoubleWritable[] outArray = new DoubleWritable[universeSize*2];
    boolean isFirst = true;
    for (DoubleArrayWritable partialSum : partialSums) {
        for (int i = 0; i < universeSize*2; i++) {
            if (!isFirst) {
                outArray[i].set(outArray[i].get()
                                  + partialSum.get(i).get());
            } else {
                outArray[i]
                    = new DoubleWritable(partialSum.get(i).get());
            }
        }
        isFirst = false;
    }
    context.write(column, new DoubleArrayWritable(outArray));
}

}

Редуктор:

public class CalculateMeanAndSTDEVReducer extends
       Reducer <IntWritable,
                DoubleArrayWritable,
                IntWritable,
                DoubleArrayWritable> {

   private int dataDimFrom;
   private int dataDimTo;
   private int universeSize;

@Override
protected void setup(Context context) throws IOException {
    Configuration conf = context.getConfiguration();
    dataDimFrom = conf.getInt("dataDimFrom", 0);
    dataDimTo = conf.getInt("dataDimTo", 0);
    universeSize = dataDimTo - dataDimFrom + 1;
}

@Override
public void reduce(
        IntWritable column,
        Iterable<DoubleArrayWritable> partialSums,
        Context context) throws IOException, InterruptedException {
    DoubleWritable[] outArray = new DoubleWritable[universeSize*2];
    boolean isFirst = true;
    for (DoubleArrayWritable partialSum : partialSums) {
        for (int i = 0; i < universeSize; i++) {
            if (!isFirst) {
                outArray[i].set(outArray[i].get() + partialSum.get(i).get());
            } else {
                outArray[i] = new DoubleWritable(partialSum.get(i).get());
            }
        }
        isFirst = false;
    }
    for (int i = universeSize; i < universeSize * 2; i++) {
        double mean = outArray[i-universeSize].get();
        outArray[i].set(Math.sqrt(outArray[i].get() - mean*mean));
    }
    context.write(column, new DoubleArrayWritable(outArray));
}

}

Где DoubleArrayWritable — это простой класс, расширяющий ArrayWritable:

public class DoubleArrayWritable extends ArrayWritable {

public DoubleArrayWritable() {
    super(DoubleWritable.class);
}

public DoubleArrayWritable(DoubleWritable[] values) {
    super(DoubleWritable.class, values);
}

public DoubleWritable get(int idx) {
    return (DoubleWritable) get()[idx];
}

}

person Pavel Nuzhdin    schedule 06.11.2011    source источник
comment
Почему вы делаете это «context.write(new IntWritable(1), new DoubleArrayWritable(outArray))» в вашем картографе? Итак, ваш столбец всегда равен 1?   -  person Thomas Jungblut    schedule 06.11.2011
comment
@ThomasJungblut Я вижу здесь два решения: 1) Mapper создает пары: {1, {partMean1, partMean2, ... partMeanK, partSumSquare1, partSumSquare2, ... partSumSquareK}} (которые я описал здесь) 2) Mapper создает пары: {1, {partMean1, partSumSquare1}}, {2, {partMean2, partSumSquare2}}, ... {K, {partMeanK, partSumSquareK}} где K - количество переменных (столбцов) в данных. Я протестировал оба, но не получил хорошего ускорения в результатах.   -  person Pavel Nuzhdin    schedule 07.11.2011


Ответы (1)


Я задал вопрос о другой работе в той же среде с той же проблемой. Дэвид Грузман угадал проблему в разнице времени запуска задания (локальное, кластерное). Он предложил оптимальный размер данных, чтобы увидеть хорошее ускорение (5 ГБ) в этой среде. Я пробовал, и это правда.

Почему работа только с картографами такая медленная в реальном кластер?

person Pavel Nuzhdin    schedule 21.11.2011