Python MapReduce Hadoop Streaming Job, которому требуется несколько входных файлов?

У меня есть два файла в моем кластере File A и File B со следующими данными:

Файл А

#Format: 
#Food Item | Is_A_Fruit (BOOL)

Orange | Yes
Pineapple | Yes
Cucumber | No
Carrot | No
Mango | Yes

Файл B

#Format:
#Food Item | Vendor Name

Orange | Vendor A
Pineapple | Vendor B
Cucumber | Vendor B
Carrot | Vendor B
Mango | Vendor A

Обычно я хочу узнать Сколько фруктов продает каждый продавец?

Ожидаемый результат:

Vendor A | 2
Vendor B | 1

Мне нужно сделать это с помощью сокращения карты python для потоковой передачи hadoop.

Я прочитал, как выполнить базовый подсчет слов, я прочитал из sys.stdin и испускаю k,v пар для редуктора, чтобы затем уменьшить.

Как мне подойти к этой проблеме?

Меня больше всего беспокоит то, как читать из нескольких файлов, а затем сравнивать их в Hadoop Streaming.

Я могу сделать это на обычном питоне (то есть без MapReduce и Hadoop, это просто.), Но это невозможно из-за огромного размера данных, которые у меня с собой.


person ComputerFellow    schedule 27.12.2013    source источник


Ответы (4)


Взгляните на этот пример, поскольку он имеет прямое отношение к тому, что вы ищете.

person Vishal    schedule 28.12.2013
comment
+1. Первоначально я немного скептически относился к тому, что пример действительно работал так, как было предложено, потому что автор закапывает важную часть, например, разделение разделения и сортировки, которое выполняется с помощью -jobconf stream.num.map.output.key.fields = 4 - jobconf map.output.key.field.separator = ^ -jobconf num.key.fields.for.partition = 1. - person cohoz; 01.01.2014

Неужели файл А такой большой? Я бы поместил его в DistributedCache и прочитал оттуда. Чтобы поместить его в распределенный кеш, используйте эту опцию в вызове потоковой передачи Hadoop:

-cacheFile 'hdfs://namenode:port/the/hdfs/path/to/FileA#FileA'

(Я полагаю, что следующее тоже должно работать, но я не пробовал :)

-cacheFile '/the/hdfs/path/to/FileA#FileA'

Обратите внимание, что #fileA - это имя, которое вы используете, чтобы сделать файл доступным для ваших картографов.

Затем в вашем картографе вы прочитаете FileB из sys.stdin (предполагая, что вы вызвали Hadoop Streaming с помощью -input '/user/foo/FileB') И, чтобы прочитать FileA, вы должны сделать что-то вроде этого:

f = open('FileA', 'r')
...
f.readline()

Теперь, я полагаю, вы уже подумали об этом, но для меня было бы разумно иметь такой картограф:

  1. Открыть FileA
  2. Прочтите FileA построчно (в цикле) и загрузите его в карту, чтобы вы могли легко найти ключ и найти его значение (да, нет).
  3. Попросите ваш основной цикл читать из stdin. Внутри цикла для каждой строки (в FileB) проверьте свою карту (см. Шаг 2), чтобы узнать, есть ли у вас фрукт или нет ... и т. Д.
person cabad    schedule 27.12.2013

Один из подходов состоит в том, чтобы сделать это как две работы.

  1. Filter FileB so that only rows containing fruits are retained
    • Map1: A composite key of "Food Item" and which file the data came from. Partition on "Food Item", with a secondary sort whether the row contains "Is_A_Fruit" information (to ensure that this is read first by the reducer for each food item).
    • Reduce1: при вторичной сортировке первая строка в отсортированных данных будет указывать, является ли этот элемент питания фруктом (в этом случае редуктор выведет его) или нет (в этом случае это не так).
  2. Use the vendor as key to count the number of fruits per vendor.
    • The MapReduce output from the first job here now has the same structure as FileB, but all rows are Fruits, so this is more like wordcount, with Vendor Name as the key, and then count the number of rows.
    • Если вам нужны уникальные фрукты, вам может потребоваться снова использовать вторичную сортировку, чтобы избавиться от необходимости загружать в память все фрукты, связанные с каждым поставщиком.

Тем не менее: решение, которое предлагает @cabad, лучше всего, если файл достаточно мал.

Если нет, то лучше всего использовать вторичные сортировки. Взгляните на это руководство, предложенное в ответе @ Simplefish здесь, чтобы узнать, как выполнять вторичную сортировку в разделе (это ключевые слова, которые укажут вам правильное направление, чтобы сделать то, что вы хотите: сделайте гарантии о порядке данных, связанных с данным ключом, который передается в редуктор).

И последнее замечание: ваша проблема заключается не в том, «Как читать из нескольких файлов», поскольку любое решение, которое вы разрабатываете, не может полагаться на знание того, из какого файла поступает ввод (вам нужно будет полагаться на структуру данных, хотя это не проблема в этом примере).

person cohoz    schedule 31.12.2013

Вы предоставите в hadoop только каталог, содержащий файлы. Фреймворк Hadoop их прочитает. Вы этого не сделаете.

Hadoop применит код карты ко всему содержимому файлов.

Затем hadoop применит закодированный вами класс уменьшения ко всем выходным данным из класса карты.

person Siva Tumma    schedule 03.01.2014