Проблема с вычислением оконной функции pyspark с помощью метода avg

У меня есть входной фрейм данных, как показано ниже:

partner_id|month_id|value1 |value2
1001      |  01    |10     |20    
1002      |  01    |20     |30    
1003      |  01    |30     |40
1001      |  02    |40     |50    
1002      |  02    |50     |60    
1003      |  02    |60     |70
1001      |  03    |70     |80    
1002      |  03    |80     |90    
1003      |  03    |90     |100

Используя приведенный ниже код, я создал два новых столбца, которые вычисляют среднее значение с помощью оконной функции:

rnum = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
df = df.withColumn("value1_1", F.avg("value1").over(rnum))
df = df.withColumn("value1_2", F.avg("value2").over(rnum))

Выход:

partner_id|month_id|value1 |value2|value1_1|value2_2
1001      |  01    |10     |20    |10      |20
1002      |  01    |20     |30    |20      |30
1003      |  01    |30     |40    |30      |40
1001      |  02    |40     |50    |25      |35
1002      |  02    |50     |60    |35      |45
1003      |  02    |60     |70    |45      |55
1001      |  03    |70     |80    |40      |50
1002      |  03    |80     |90    |50      |60
1003      |  03    |90     |100   |60      |70

Совокупное среднее значение хорошо работает для столбцов value1 и value2 с использованием функции pyspark Window. Но, если мы пропустим данные за один месяц во входных данных, как показано ниже, для следующего месяца средний расчет должен происходить на основе номера месяца. вместо нормального среднего. Например, если ввод такой, как показано ниже (данные за месяц 02 отсутствуют)

partner_id|month_id|value1 |value2
1001      |  01    |10     |20    
1002      |  01    |20     |30    
1003      |  01    |30     |40
1001      |  03    |70     |80    
1002      |  03    |80     |90    
1003      |  03    |90     |100

Затем происходит вычисление среднего значения для трех записей за месяц, как показано ниже: например: (70 + 10) / 2 Но, как правильно рассчитать среднее значение, если определенные значения месяца отсутствуют ???


person Rocky1989    schedule 22.07.2020    source источник
comment
Можете ли вы показать неправильный результат и какой вы ожидаете?   -  person Steven    schedule 22.07.2020


Ответы (2)


если вы используете Spark 2.4+. вы можете использовать функцию последовательности и функции массива. Это решение основано на этой ссылке.

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window

    w= Window().partitionBy("partner_id")

    df1 =df.withColumn("month_seq", F.sequence(F.min("month_id").over(w), F.max("month_id").over(w), F.lit(1)))\
        .groupBy("partner_id").agg(F.collect_list("month_id").alias("month_id"), F.collect_list("value1").alias("value1"), F.collect_list("value2").alias("value2")
         ,F.first("month_seq").alias("month_seq")).withColumn("month_seq", F.array_except("month_seq","month_id"))\
        .withColumn("month_id",F.flatten(F.array("month_id","month_seq"))).drop("month_seq")\
        .withColumn("zip", F.explode(F.arrays_zip("month_id","value1", "value2"))) \
        .select("partner_id", "zip.month_id", F.when(F.col("zip.value1").isNull() , \
                                          F.lit(0)).otherwise(F.col("zip.value1")).alias("value1"),
                                          F.when(F.col("zip.value2").isNull(), F.lit(0)).otherwise(F.col("zip.value2")
                                                                                         ).alias("value2")).orderBy("month_id")

    rnum = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))

    df2 = df1.withColumn("value1_1", F.avg("value1").over(rnum)).withColumn("value1_2", F.avg("value2").over(rnum))

    df2.show()

    # +----------+--------+------+------+------------------+------------------+
    # |partner_id|month_id|value1|value2|          value1_1|          value1_2|
    # +----------+--------+------+------+------------------+------------------+
    # |      1002|       1|    10|    20|              10.0|              20.0|
    # |      1002|       2|     0|     0|               5.0|              10.0|
    # |      1002|       3|    80|    90|              30.0|36.666666666666664|
    # |      1001|       1|    10|    10|              10.0|              10.0|
    # |      1001|       2|     0|     0|               5.0|               5.0|
    # |      1001|       3|    70|    80|26.666666666666668|              30.0|
    # |      1003|       1|    30|    40|              30.0|              40.0|
    # |      1003|       2|     0|     0|              15.0|              20.0|
    # |      1003|       3|    90|   100|              40.0|46.666666666666664|
    # +----------+--------+------+------+------------------+------------------+
person kites    schedule 22.07.2020
comment
если вам не нужен второй месяц из набора результатов. вы можете удалить те строки, которые имеют значение 1 или значение 2 = 0 - person kites; 23.07.2020

Spark недостаточно сообразителен, чтобы понять, что не хватает одного месяца, поскольку он даже не знает, сколько будет, вероятно, месяц.

Если вы хотите, чтобы недостающий месяц был включен в вычисление среднего значения, вам необходимо сгенерировать недостающие данные.

Просто выполните полное внешнее соединение с фреймом данных [month_id, defaultValue], где month_id - это значения от 1 до 12 и defaultValue = 0.


Другое решение: вместо вычисления среднего вы вычисляете сумму значений и делите их на номер месяца.

person Steven    schedule 22.07.2020
comment
Благодарю. Можем ли мы создать такое же количество фиктивных записей, как и в другие месяцы ??? - person Rocky1989; 22.07.2020