Запуск программы MapReduce на Hadoop выводит только половину моих данных

Я запускаю простую программу MapReduce на Hadoop, вычисляя минимальное, максимальное, медианное и стандартное отклонение значений из столбца набора данных. Когда я запускаю это локально на своем компьютере, я получаю окончательный результат, рассчитанный на основе всех значений набора данных. Однако, когда я запускаю это в Hadoop, результат соответствует примерно половине значений из столбца набора данных. Код ниже:

картограф.py

#!/usr/bin/env python3
import sys
import csv

# Load data
data = csv.DictReader(sys.stdin)

# Prints/Passes key-value to reducer.py
for row in data:
    for col, value in row.items():
        if col == sys.argv[1]:
            print('%s\t%s' % (col, value))

редуктор.py

#!/usr/bin/env python3
import sys
import statistics

key = None
current_key = None
num_list = []
for line in sys.stdin:
    # Remove leading and trailng whitespace
    line = line.strip()

    # Parse input 
    key, value = line.split('\t', 1)

    # Convert string to int
    try:
        value = float(value)
    except ValueError:
        # Skip the value
        continue

    if current_key == key:
        num_list.append(value)
    else:
        if current_key:
            print("Num. of Data Points %s\t --> Max: %s\t Min: %s\t Median: %s\t Standard Deviation: %s" \
                % (len(num_list), max(num_list), min(num_list), statistics.median(num_list), statistics.pstdev(num_list)))
        num_list.clear()
        num_list.append(value)
        current_key = key

# Output last value if needed
if current_key == key:
    print("Num. of Data Points %s\t --> Max: %s\t Min: %s\t Median: %s\t Standard Deviation: %s" \
                % (len(num_list), max(num_list), min(num_list), statistics.median(num_list), statistics.pstdev(num_list)))

Журнал Хаддопа:

2019-12-02 23:54:40,705 INFO mapreduce.Job: Running job: job_1575141442909_0026
2019-12-02 23:54:47,903 INFO mapreduce.Job: Job job_1575141442909_0026 running in uber mode : false
2019-12-02 23:54:47,906 INFO mapreduce.Job:  map 0% reduce 0%
2019-12-02 23:54:54,019 INFO mapreduce.Job:  map 100% reduce 0%
2019-12-02 23:54:59,076 INFO mapreduce.Job:  map 100% reduce 100%
2019-12-02 23:55:00,115 INFO mapreduce.Job: Job job_1575141442909_0026 completed successfully
2019-12-02 23:55:00,253 INFO mapreduce.Job: Counters: 54
        File System Counters
                FILE: Number of bytes read=139868
                FILE: Number of bytes written=968967
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=501097
                HDFS: Number of bytes written=114
                HDFS: Number of read operations=11
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
                HDFS: Number of bytes read erasure-coded=0
        Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=7492
                Total time spent by all reduces in occupied slots (ms)=2767
                Total time spent by all map tasks (ms)=7492
                Total time spent by all reduce tasks (ms)=2767
                Total vcore-milliseconds taken by all map tasks=7492
                Total vcore-milliseconds taken by all reduce tasks=2767
                Total megabyte-milliseconds taken by all map tasks=7671808
                Total megabyte-milliseconds taken by all reduce tasks=2833408
        Map-Reduce Framework
                Map input records=10408
                Map output records=5203
                Map output bytes=129456
                Map output materialized bytes=139874
                Input split bytes=220
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=139874
                Reduce input records=5203
                Reduce output records=1
                Spilled Records=10406
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=80
                CPU time spent (ms)=2790
                Physical memory (bytes) snapshot=676896768
                Virtual memory (bytes) snapshot=8266964992
                Total committed heap usage (bytes)=482344960
                Peak Map Physical memory (bytes)=253210624
                Peak Map Virtual memory (bytes)=2755108864
                Peak Reduce Physical memory (bytes)=173010944
                Peak Reduce Virtual memory (bytes)=2758103040
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=500877
        File Output Format Counters
                Bytes Written=114
2019-12-02 23:55:00,254 INFO streaming.StreamJob: Output directory: data/output

Локальный вывод:

# Data Points: 10407     Max: 89.77682042        Min: 13.87331897        Median: 46.44807153     Standard Deviation: 11.156280347146872

Выход Hadoop:

# Data Points: 5203      Max: 89.77682042        Min: 13.87331897        Median: 46.202181       Standard Deviation: 11.28118280525746

Как вы можете видеть, количество точек данных в выходных данных Hadoop составляет почти ровно половину полного количества точек данных из локального вывода. Я пытался использовать разные наборы данных с разным размером, и это все равно всегда половина... Я что-то делаю неправильно или что-то упускаю?


person Roberto Sciortino    schedule 03.12.2019    source источник
comment
@BenWatson Я так не думаю, поскольку мой локальный вывод предоставляет мне все необходимые точки данных, поэтому, почему он оценивается как false на hadoop, но не локально?   -  person Roberto Sciortino    schedule 03.12.2019
comment
Есть ли конкретная причина не делать этого в pyspark?   -  person OneCricketeer    schedule 03.12.2019
comment
@cricket_007 Это классное задание, требующее использования Hadoop.   -  person Roberto Sciortino    schedule 03.12.2019
comment
Spark может работать на Hadoop. Вы имеете в виду MapReduce?   -  person OneCricketeer    schedule 03.12.2019


Ответы (1)


Я понял, почему я получил такой вывод. Причина в том, что Hadoop разделял мои входные данные на две части для двух разных картографов, как я и подозревал. Однако только первая половина данных сохранила заголовки столбцов набора данных, поэтому, когда вторая половина набора данных читалась картографом, указанные столбцы не будут доступны.

Я удалил существующие заголовки из набора данных и установил имена полей при чтении данных, чтобы решить мою проблему:

data = csv.DictReader(sys.stdin, fieldnames=("col1", "col2", "col3", "col4", "col5"))
person Roberto Sciortino    schedule 03.12.2019