开发者社区> 问答> 正文
1
0
分享

Window.rowsBetween - 仅考虑满足特定条件的行(例如,不为null)

我有一个Spark DataFrame,其列包含不是每行的值,而是仅针对某些行(在某种程度上有规律地,例如,基于id的每5到10行)。

现在,我想将一个窗口函数应用于包含值的行,这些行包含前两个行,这两行也包含值(所以基本上假装所有包含空值的行都不存在=不计入rowsBetween-range窗口)。实际上,我的有效窗口大小可以是任意的,具体取决于包含空值的行数。但是,我总是需要前后两个值。此外,最终结果应包含所有行,因为其他列包含重要信息。


例如,我想计算前两个的总和,即下面的数据帧中非空的行的当前和后两个(非空)值:

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=i, val=i * 2 if i % 5 == 0 else None, foo='other') for i in range(100)])
df.show()
输出:

fooidval
other00
other1null
other2null
other3null
other4null
other510
other6null
other7null
other8null
other9null
other1020
other11null
other12null
other13null
other14null
other1530
other16null
other17null
other18null
other19null

如果我只是在数据帧上使用Window函数,我不能指定值不能为null的条件,因此窗口只包含空值,使得总和等于行值:

df2 = df.withColumn('around_sum', F.when(F.col('val').isNotNull(), F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id')))).otherwise(None))
df2.show()
结果:

fooidvalaround_sum
other000
other1nullnull
other2nullnull
other3nullnull
other4nullnull
other51010
other6nullnull
other7nullnull
other8nullnull
other9nullnull
other102020
other11nullnull
other12nullnull
other13nullnull
other14nullnull
other153030
other16nullnull
other17nullnull
other18nullnull
other19nullnull

通过创建仅包含值不为null的行的第二个数据帧,在那里执行窗口操作,然后再次连接结果,我能够实现所需的结果:

df3 = df.where(F.col('val').isNotNull())\

.withColumn('around_sum', F.sum(F.col('val')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))\
.select(F.col('around_sum'), F.col('id').alias('id2'))

df3 = df.join(df3, F.col('id') == F.col('id2'), 'outer').orderBy(F.col('id')).drop('id2')
df3.show()
结果:

fooidvalaround_sum
other0030
other1nullnull
other2nullnull
other3nullnull
other4nullnull
other51060
other6nullnull
other7nullnull
other8nullnull
other9nullnull
other1020100
other11nullnull
other12nullnull
other13nullnull
other14nullnull
other1530150
other16nullnull
other17nullnull
other18nullnull
other19nullnull

现在我想知道我是否可以以某种方式摆脱连接(和第二个DataFrame),而是直接在Window函数中指定条件。

展开
收起
社区小助手 2019-01-02 15:10:43 5179 0
举报
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    一个好的解决方案是从0填充空值开始,然后执行操作。仅在涉及的列上填充,如下所示:

    df = df.fillna(0,subset=['val'])
    如果您不确定是否要删除空值,请复制列值,然后计算该列上的窗口,以便在操作后删除它。

    像这样:

    df = df.withColumn('val2',F.col('val'))
    df = df.fillna(0,subset=['val2'])

    Then perform the operations over val2.

    df = df.withColumn('around_sum', F.sum(F.col('val2')).over(Window.rowsBetween(-2, 2).orderBy(F.col('id'))))

    After the operations, get rid of the copy column

    2019-07-17 23:24:25 举报
    赞同 评论 打赏

    评论

    全部评论 (0)

    登录后可评论
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等