Добавить столбец агрегирования в фрейм данных Spark

У меня есть фрейм данных Spark, который выглядит так:

| id | value | bin |
|----+-------+-----|
|  1 |   3.4 |   2 |
|  2 |   2.6 |   1 |
|  3 |   1.8 |   1 |
|  4 |   9.6 |   2 |

У меня есть функция f, которая принимает массив значений и возвращает число. Я хочу добавить столбец в приведенный выше фрейм данных, где значение для нового столбца в каждой строке является значением f для всех записей value, имеющих одинаковую запись bin, то есть:

| id | value | bin | f_value       |
|----+-------+-----+---------------|
|  1 |   3.4 |   2 | f([3.4, 9.6]) |
|  2 |   2.6 |   1 | f([2.6, 1.8]) |
|  3 |   1.8 |   1 | f([2.6, 1.8]) |
|  4 |   9.6 |   2 | f([3.4, 9.6]) |

Поскольку мне нужно агрегировать все value на bin, я не могу использовать функцию withColumn для добавления этого нового столбца. Как лучше всего это сделать, пока пользовательские функции агрегирования не войдут в Spark?


person calstad    schedule 28.05.2015    source источник


Ответы (1)


Ниже код не тестируется, это всего лишь идея.

В Hive это можно сделать так, используя collect_list функция.

val newDF = sqlContext.sql(
    "select bin, collect_list() from aboveDF group by bin")

Следующие join aboveDF и newDF на корзине.

Это то, что вы ищите?

person Akash    schedule 28.05.2015
comment
Похоже, это сработает, но я надеялся, что мне не придется вызывать соединение. Спасибо! - person calstad; 29.05.2015