Spark - Создание вложенного фрейма данных

Я начинаю с PySpark, и у меня проблемы с созданием DataFrames с вложенными объектами.

Это мой пример.

У меня есть пользователи.

$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}

У пользователей есть заказы.

$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

И мне нравится присоединяться к нему, чтобы получить такую ​​структуру, в которой заказы представляют собой массив, вложенный в пользователей.

$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

Как я могу это сделать ? Есть ли какое-либо вложенное соединение или что-то подобное?

>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> order =  sqlContext.read.json("order.json")
>>> order.printSchema();
root
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- userid: long (nullable = true)

>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

РЕДАКТИРОВАТЬ: Я знаю, что есть возможность сделать это с помощью join и foldByKey, но есть ли более простой способ?

EDIT2: я использую решение от @ zero323

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")

Добавляю вторую вложенную структуру 'lines'

>>> lines =  sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
 |-- id: long (nullable = true)
 |-- orderid: long (nullable = true)
 |-- product: string (nullable = true)

orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
 |    |    |-- lines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _1: long (nullable = true)
 |    |    |    |    |-- _2: long (nullable = true)
 |    |    |    |    |-- _3: string (nullable = true)

После этого имена столбцов из строк теряются. Любые идеи ?

РЕДАКТИРОВАТЬ 3: я пытался вручную указать схему.

from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema

grouped =  lines.rdd.groupBy(lambda r: r.orderid)
grouped =  grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)

Ошибка:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>

person Maciek Bryński    schedule 10.08.2015    source источник


Ответы (3)


Это будет работать только в Spark 2.0 или новее

Для начала нам понадобится пара импортированных файлов:

from pyspark.sql.functions import struct, collect_list

Остальное - простая агрегация и присоединение:

orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")

combined = users.join(
    orders
        .groupBy("userId")
        .agg(collect_list(struct(*orders.columns)).alias("orders"))
        .withColumnRenamed("userId", "id"), ["id"])

Для данных примера результат:

combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders                     |
+---+-----+---------------------------+
|1  |UserA|[[1,202.3,1], [2,343.99,1]]|
|2  |UserB|[[3,399.99,2]]             |
+---+-----+---------------------------+

со схемой:

combined.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

и представление JSON:

for x in combined.toJSON().collect():
    print(x)     
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
person zero323    schedule 11.12.2016
comment
Обратите внимание, что причина, по которой это работает, заключается в том, что даже несмотря на то, что количество пользователей велико (поскольку оно находится во фрейме данных), количество заказов для конкретного пользователя достаточно мало, чтобы их можно было хранить в коллекции. Что, если бы это было не так? Что, если бы заказы были чем-то другим, ради аргументации, пользователями с таким же цветом волос, которые не могут быть собраны в коллекцию? Придется ли вам собирать цвета волос и последовательно просматривать их, или вы могли бы использовать декартово соединение? - person oneirois; 09.07.2018
comment
@oneirois Короче - вся идея не будет жизнеспособной. Строка - это минимальная единица параллелизма, которую нельзя разделить или частично разделить. Вы можете просто использовать DataFrameWriter, чтобы иметь отдельный файл для каждого фактора группирования, не создавая вложенной структуры - это будет намного лучше масштабироваться. - person zero323; 10.07.2018
comment
Именно этим я и закончил :) - person oneirois; 14.07.2018

Во-первых, вам нужно использовать userid как ключ соединения для второго DataFrame:

user.join(order, user.id == order.userid)

Затем вы можете использовать шаг map для преобразования результирующих записей в желаемый формат.

person Dean Wampler    schedule 10.08.2015
comment
Не совсем. Map недостаточно. Если я присоединюсь к пользователям и заказам, у меня будет 3 записи. (а я хочу только 2). Так что мне тоже нужна какая-то агрегация (например, foldByKey) - person Maciek Bryński; 10.08.2015

Для преобразования вложенного фрейма данных в нормальное использование

dff= df.select("column with multiple columns.*").toPandas()

person Pavan Purohit    schedule 29.05.2019
comment
Пожалуйста, не используйте Pandas! Это вызовет Spark collect ()! Это очень медленно и не распределяется, потому что все данные будут возвращены в одну точку, которая является драйвером Spark. - person prossblad; 02.10.2019