Window.rowsBetween — рассматривать только строки, удовлетворяющие определенному условию (например, не являющиеся нулевыми)

Проблема

У меня есть Spark DataFrame со столбцом, который содержит значения не для каждой строки, а только для некоторых строк (несколько регулярно, например, только каждые 5-10 строк на основе идентификатора).

Теперь я хотел бы применить оконную функцию к строкам, содержащим значения, включающие две предыдущие и две следующие строки, которые также содержат значения (по сути, притворяясь, что все строки, содержащие нулевые значения, не существуют = don не засчитывается в rowsBetween-диапазон окна). На практике мой эффективный размер окна может быть произвольным в зависимости от того, сколько существует строк, содержащих нулевые значения. Однако мне всегда нужно ровно два значения до и после. Кроме того, конечный результат должен содержать все строки из-за других столбцов, содержащих важную информацию.

Пример

Например, я хочу вычислить сумму по двум предыдущим, текущему и двум следующим (ненулевым) значениям для строк в следующем кадре данных, которые не равны нулю:

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=i, val=i * 2 if i % 5 == 0 else None, foo='other') for i in range(100)])
df.show()

Выход:

+-----+---+----+
|  foo| id| val|
+-----+---+----+
|other|  0|   0|
|other|  1|null|
|other|  2|null|
|other|  3|null|
|other|  4|null|
|other|  5|  10|
|other|  6|null|
|other|  7|null|
|other|  8|null|
|other|  9|null|
|other| 10|  20|
|other| 11|null|
|other| 12|null|
|other| 13|null|
|other| 14|null|
|other| 15|  30|
|other| 16|null|
|other| 17|null|
|other| 18|null|
|other| 19|null|
+-----+---+----+

Если я просто использую функцию Window для фрейма данных как есть, я не могу указать условие, что значения не должны быть нулевыми, поэтому окно содержит только нулевые значения, что делает сумму равной значению строки:

df2 = df.withColumn('around_sum', F.when(F.col('val').isNotNull(), F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id')))).otherwise(None))
df2.show()

Результат:

+-----+---+----+----------+
|  foo| id| val|around_sum|
+-----+---+----+----------+
|other|  0|   0|         0|
|other|  1|null|      null|
|other|  2|null|      null|
|other|  3|null|      null|
|other|  4|null|      null|
|other|  5|  10|        10|
|other|  6|null|      null|
|other|  7|null|      null|
|other|  8|null|      null|
|other|  9|null|      null|
|other| 10|  20|        20|
|other| 11|null|      null|
|other| 12|null|      null|
|other| 13|null|      null|
|other| 14|null|      null|
|other| 15|  30|        30|
|other| 16|null|      null|
|other| 17|null|      null|
|other| 18|null|      null|
|other| 19|null|      null|
+-----+---+----+----------+

Я смог добиться желаемого результата, создав второй кадр данных, содержащий только строки, где значение не равно нулю, выполнив там оконную операцию, а затем снова присоединив результат:

df3 = df.where(F.col('val').isNotNull())\
    .withColumn('around_sum', F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))\
    .select(F.col('around_sum'), F.col('id').alias('id2'))
df3 = df.join(df3, F.col('id') == F.col('id2'), 'outer').orderBy(F.col('id')).drop('id2')
df3.show()

Результат:

+-----+---+----+----------+
|  foo| id| val|around_sum|
+-----+---+----+----------+
|other|  0|   0|        30|
|other|  1|null|      null|
|other|  2|null|      null|
|other|  3|null|      null|
|other|  4|null|      null|
|other|  5|  10|        60|
|other|  6|null|      null|
|other|  7|null|      null|
|other|  8|null|      null|
|other|  9|null|      null|
|other| 10|  20|       100|
|other| 11|null|      null|
|other| 12|null|      null|
|other| 13|null|      null|
|other| 14|null|      null|
|other| 15|  30|       150|
|other| 16|null|      null|
|other| 17|null|      null|
|other| 18|null|      null|
|other| 19|null|      null|
+-----+---+----+----------+

Вопрос

Теперь мне интересно, могу ли я как-то избавиться от соединения (и второго DataFrame) и вместо этого напрямую указать условие в функции окна.

Это возможно?


person Matthias    schedule 20.11.2018    source источник
comment
Вы думали с fillna() ?   -  person Bala    schedule 20.11.2018
comment
@Bala: Не уверен, как я мог бы использовать fillna() для этой цели - я действительно не хочу заполнять нулевые значения. Я также рассматривал использование last/first, но пока не нашел решения, которое может обрабатывать более +/- одного значения (но в этом случае мне нужно два).   -  person Matthias    schedule 21.11.2018


Ответы (1)


Хорошим решением будет начать с заполнения нулей нулями, а затем выполнить операции. Выполните fillna только для задействованного столбца, например:

df = df.fillna(0,subset=['val'])

Если вы не уверены, хотите ли вы избавиться от нулей, скопируйте значение столбца, а затем рассчитайте окно для этого столбца, чтобы вы могли избавиться от него после операции.

Как это:

df = df.withColumn('val2',F.col('val'))
df = df.fillna(0,subset=['val2'])
# Then perform the operations over val2.
df = df.withColumn('around_sum', F.sum(F.col('val2')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))
# After the operations, get rid of the copy column
df = df.drop('val2')
person Manrique    schedule 27.11.2018
comment
Спасибо за ответ! Однако результат сильно отличается от моего ожидаемого результата: когда я запускаю его, значение around_sum == для моего примера фрейма данных. - person Matthias; 27.11.2018