我有一个Spark DataFrame,其列包含不是每行的值,而是仅针对某些行(在某种程度上有规律地,例如,基于id的每5到10行)。
现在,我想将一个窗口函数应用于包含值的行,这些行包含前两个行,这两行也包含值(所以基本上假装所有包含空值的行都不存在=不计入rowsBetween-range窗口)。实际上,我的有效窗口大小可以是任意的,具体取决于包含空值的行数。但是,我总是需要前后两个值。此外,最终结果应包含所有行,因为其他列包含重要信息。
例
例如,我想计算前两个的总和,即下面的数据帧中非空的行的当前和后两个(非空)值:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row
df = spark.createDataFrame([Row(id=i, val=i * 2 if i % 5 == 0 else None, foo='other') for i in range(100)])
df.show()
输出:
foo | id | val |
---|---|---|
other | 0 | 0 |
other | 1 | null |
other | 2 | null |
other | 3 | null |
other | 4 | null |
other | 5 | 10 |
other | 6 | null |
other | 7 | null |
other | 8 | null |
other | 9 | null |
other | 10 | 20 |
other | 11 | null |
other | 12 | null |
other | 13 | null |
other | 14 | null |
other | 15 | 30 |
other | 16 | null |
other | 17 | null |
other | 18 | null |
other | 19 | null |
如果我只是在数据帧上使用Window函数,我不能指定值不能为null的条件,因此窗口只包含空值,使得总和等于行值:
df2 = df.withColumn('around_sum', F.when(F.col('val').isNotNull(), F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id')))).otherwise(None))
df2.show()
结果:
foo | id | val | around_sum |
---|---|---|---|
other | 0 | 0 | 0 |
other | 1 | null | null |
other | 2 | null | null |
other | 3 | null | null |
other | 4 | null | null |
other | 5 | 10 | 10 |
other | 6 | null | null |
other | 7 | null | null |
other | 8 | null | null |
other | 9 | null | null |
other | 10 | 20 | 20 |
other | 11 | null | null |
other | 12 | null | null |
other | 13 | null | null |
other | 14 | null | null |
other | 15 | 30 | 30 |
other | 16 | null | null |
other | 17 | null | null |
other | 18 | null | null |
other | 19 | null | null |
通过创建仅包含值不为null的行的第二个数据帧,在那里执行窗口操作,然后再次连接结果,我能够实现所需的结果:
df3 = df.where(F.col('val').isNotNull())\
.withColumn('around_sum', F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))\
.select(F.col('around_sum'), F.col('id').alias('id2'))
df3 = df.join(df3, F.col('id') == F.col('id2'), 'outer').orderBy(F.col('id')).drop('id2')
df3.show()
结果:
foo | id | val | around_sum |
---|---|---|---|
other | 0 | 0 | 30 |
other | 1 | null | null |
other | 2 | null | null |
other | 3 | null | null |
other | 4 | null | null |
other | 5 | 10 | 60 |
other | 6 | null | null |
other | 7 | null | null |
other | 8 | null | null |
other | 9 | null | null |
other | 10 | 20 | 100 |
other | 11 | null | null |
other | 12 | null | null |
other | 13 | null | null |
other | 14 | null | null |
other | 15 | 30 | 150 |
other | 16 | null | null |
other | 17 | null | null |
other | 18 | null | null |
other | 19 | null | null |
现在我想知道我是否可以以某种方式摆脱连接(和第二个DataFrame),而是直接在Window函数中指定条件。
一个好的解决方案是从0填充空值开始,然后执行操作。仅在涉及的列上填充,如下所示:
df = df.fillna(0,subset=['val'])
如果您不确定是否要删除空值,请复制列值,然后计算该列上的窗口,以便在操作后删除它。
像这样:
df = df.withColumn('val2',F.col('val'))
df = df.fillna(0,subset=['val2'])
df = df.withColumn('around_sum', F.sum(F.col('val2')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)