SQL、Pandas、Spark:窗口函数的3种实现

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 窗口函数是数据库查询中的一个经典场景,在解决某些特定问题时甚至是必须的。个人认为,在单纯的数据库查询语句层面【即不考虑DML、SQL调优、索引等进阶】,窗口函数可看作是考察求职者SQL功底的一个重要方面。前期个人以求职者身份参加面试时被问及窗口函数的问题,近期在作为面试官也提问过这一问题,但回答较为理想者居少。所以本文首先窗口函数进行讲解,然后分别从SQL、Pandas和Spark三种工具平台展开实现

640.png

模拟问题描述:

给定一组中学生的历次语文月考成绩表(每名学生含有4次成绩),需要实现以下3个需求:

  • 对每名学生的4次成绩表分别进行排序,排序后每人的成绩排名1-2-3-4
  • 求每名学生历次月考成绩的变化幅度,即本月较上个月的成绩差值
  • 求每名学生历次月考成绩中近3次平均分


数据表样例如下:


640.png


01 窗口函数介绍


在分析上述需求之前,首先对窗口函数进行介绍。何为窗口函数呢?既然窗口函数这个名字源于数据库,那么我们就援引其在数据库中的定义。下图源于MySQL8.0的官方文档,从标黄高亮的一句介绍可知:窗口函数是用与当前行有关的数据行参与计算。这个翻译可能有些蹩脚,但若能感性理解窗口函数的话,其实反而会觉得其概括的比较传神。


640.png

MySQL8.0官方手册中关于窗口函数的介绍


当然,为了形象表达上述定义所言何物,这里还是进一步给出一些配套插图以便于理解。在给出具体配图之前,首先要介绍与窗口函数相关的3个关键词:

  • partition by:用于对全量数据表进行切分(与SQL中的groupby功能类似,但功能完全不同),直接体现的是前面窗口函数定义中的“有关”,即切分到同一组的即为有关,否则就是无关;
  • order by:用于指定对partition后各组内的数据进行排序;
  • rows between:用于对切分后的数据进一步限定“有关”行的数量,此种情景下即使partition后分到一组,也可能是跟当前行的计算无关。


相应的,这3个关键字在前面的数据样表中可作如下配套解释:


640.png


当然,到这里还不是很理解窗口函数以及相应的3个关键字也问题不大,后续结合前述的三个实际需求再返过来看此图多半会豁然开朗。


上面是窗口函数的逻辑解释,那么具体能用于实现什么功能呢?其实,窗口函数能实现什么功能则要取决于能搭配什么函数。仍然引用MySQL8.0官方文档中的一幅图例:


640.png


其中,上表所述的窗口函数主要分为两大类:

  • 排序类,包括row_number、rank、dense_rank等,也包括percent_rank、cume_dist等分布排序类
  • 相对引用类,如lag、lead、first_value、last_value、nth_value等

除了这两类专用窗口函数之外,还有广义的聚合函数也可配套窗口函数使用,例如sum、avg、max、min等。


所以,现在来看前面提到的三个需求,就刚好是分别应用这三类窗口函数的例子。


哪有什么刚好,不过是特意设计而已


围绕这三个需求,下面分别应用SQL、Pandas和Spark三个工具予以实现。


02 SQL实现


既然窗口函数起源于数据库,那么下面就首先应用SQL予以实现。


注:以下所有SQL查询语句实现均基于MySQL8.0。


Q1:求解每名同学历次成绩的排名。

A1:由于是区分每名同学进行排序,所以需要依据uid字段进行partition;进一步地,按照成绩进行排序,所以order by字段即为score;最后,由于是要进行排名,所以配套函数选择row_number即可。注:row_number、rank和dense_rank的具体区别可参考历史文章:一文解决所有MySQL分类排名问题


查询语句及查询结果如下:


SELECT *, row_number() over(partition by uid order by score desc) as `rank` from score


640.png


Q2:求解每名同学历次月考成绩的差值,即本月成绩-上月成绩。

A2:首先,仍然是依据uid字段进行partition;而后由于是要计算本月成绩与上月成绩的差值,所以此次的排序依据应该是date;进一步地,由于要计算差值,所以需要对每次月考成绩计算其前一行的成绩(在按照uid进行切分并按照date排序后,上月成绩即为当前行的前一条记录),所以配套函数即为lag。


给出具体实现SQL语句及查询结果如下:


SELECT *, score - lag(score) over(partition by uid order by date) as score_diff from score


640.png


Q3:求解每名学生近3次月考成绩的平均分。

A3:在前两个需求的基础上,易见,仍然是依据uid进行partition、依据date进行排序,并选用avg聚合函数作为配套窗口函数。进一步地,由于此处限定计算近3次成绩的平均分,所以除了partition和order by 两个关键字外,还需增加rows between的限定。


具体SQL语句和查询结果如下:


SELECT *, avg(score) over(partition by uid order by date rows between 2 precedin


640.png


值得指出的是,对于每名学生,当切分窗口不足指定窗口大小(即目标行数)时会按实际的数据进行聚合,例如学生A,1月31日对应的近3次平均分即为本月成绩自身;2月28日对应近3次平均分即为本月成绩和上月成绩的平均分,而3月31日和4月30日计算的近3次平均分则为真正意义上的3次成绩均值。


03 Pandas实现


Q1:求解每名同学历次成绩的排名。

A1:虽然Pandas接口非常丰富,但用其实现分组排名貌似却并不方便。不过也是可以的。基本思路如下:首先仍然分别用uid和score字段进行分组和排序,而后通过对取值=1的常数列num进行cumsum,即累加,即可获取分组排名结果。其中,还可进一步应用assign函数实现链式调用,最终整个需求实现下来也是一行代码即可!


具体Pandas实现代码即结果如下:


df.assign(rank=df.assign(num=1).sort_values("score", ascending=False).groupby("uid")['num'].cumsum())


640.png


注:上述代码应用了assign实现链式调用,具体可参考文章Pandas用了一年,这3个函数是我的最爱……


Q2:求解每名同学历次月考成绩的差值,即本月成绩-上月成绩。

A2:对于这一特定需求,Pandas中实际上是内置了偏移函数shift,专门用于求解当前行的相对引用值。进一步地,对于求解差分结果,还可直接用diff实现,其中diff就相当于当前行-shift(1)。


两种API实现代码即执行结果分别如下:


# shift函数实现
df.assign(score_diff=df["score"]-df.sort_values("date").groupby("uid")['score'].shift(1))
# diff函数实现
df.assign(score_diff=df.sort_values("date").groupby("uid")['score'].diff(1))


640.png


Q3:求解每名学生近3次月考成绩的平均分。

A3:如果说前两个需求用Pandas实现都没有很好体现窗口函数的话,那么这个需求可能才更贴近Pandas中窗口函数的标准用法——那就是用关键字rolling。rolling原义即有滚动的意思,用在这里即表达滑动窗口的意思,所以自然也就可以设置滑动窗口的大小。至于SQL中窗口函数的另外两个关键字partition和order则仍然需要借助Pandas的sort_values和gropupby来实现。另外,与SQL中类似,这里仍然是要用求均值函数来做为配套。


具体Pandas实现代码如下:


df.assign(avg_score3=df.sort_values("date").groupby("uid").rolling(window=3,


640.png


注:上述实现中用到了reset_index和set_index,其目的是为了保证滑窗聚合后保证顺序不变。为了追求单行代码实现,这里的写法不再优雅,并不提倡。


04 Spark实现


最后,选用Spark予以实现。应该讲,Spark.sql组件几乎是完全对标SQL语法的实现,这在窗口函数中也例外,包括over以及paritionBy、orderBy和rowsbetween等关键字的使用上。


注:在使用Spark窗口函数前,首先需要求引入窗口函数类Window。即


import org.apache.spark.sql.expressions.Window



Q1:求解每名同学历次成绩的排名。

A1:直接沿用SQL思路即可,需要注意Spark中的相应表达。


代码实现及相应执行结果如下:


df.select($"uid", $"date", $"score", row_number().over(Window.partitionBy("uid").or


640.png


Q2:求解每名同学历次月考成绩的差值,即本月成绩-上月成绩。

A2:首先应用lag算子求出上月成绩,而后直接相减即可。


代码及执行结果如下:


df.select($"uid",$"date", $"score", ($"score"-lag($"score", 1).over(Window.partit


640.png


Q3:求解每名学生近3次月考成绩的平均分。

A3:仍然沿用SQL中思路即可,只需增加rowsBetween函数。


代码实现及执行结果如下:


df.select($"uid",$"date", $"score", avg("score").over(Window.partitionBy("uid")


640.png


05 小节


本文首先对窗口函数进行了介绍,通过模拟设定3个实际需求问题,分别基于SQL、Pandas和Spark三个工具平台予以分析和实现。总体来看,SQL和Spark实现窗口函数的方式和语法更为接近,而Pandas虽然拥有丰富的API,但对于具体窗口函数功能的实现上却不尽统一,而需灵活调用相应的函数。当然,窗口函数的功能还有很多,三个工具平台的使用也远不止这些,但其核心原理则是大体相通的。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
SQL 关系型数据库 MySQL
MYSQL根据查询结果删除sql 去除重复id 新增对比前一条与后一条数据 去重3种方法​ 窗口函数
MYSQL根据查询结果删除sql 去除重复id 新增对比前一条与后一条数据 去重3种方法​ 窗口函数
|
2月前
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
37 1
|
2月前
|
SQL 分布式计算 数据库
Spark SQL
Spark SQL
42 1
|
24天前
|
SQL JSON 分布式计算
|
24天前
|
SQL 分布式计算 Java
|
2月前
|
SQL 存储 分布式计算
spark执行sql的原理是什么
spark执行sql的原理是什么
41 1
|
2月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
2月前
|
SQL Java API
Flink SQL 问题之窗口函数报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
|
12天前
|
数据采集 机器学习/深度学习 数据可视化
利用Python和Pandas库构建高效的数据分析流程
在数据驱动的时代,数据分析已成为企业决策的关键环节。本文介绍如何利用Python编程语言及其强大的数据分析库Pandas,构建一套高效且可扩展的数据分析流程。与常规的数据分析流程不同,本文不仅涵盖数据加载、清洗、转换等基础步骤,还强调数据可视化、模型探索与评估等高级分析技巧,并通过实际案例展示如何在Python中实现这些步骤,为数据分析师提供一套完整的数据分析解决方案。
|
20天前
|
Python
在Python的pandas库中,向DataFrame添加新列简单易行
【6月更文挑战第15天】在Python的pandas库中,向DataFrame添加新列简单易行。可通过直接赋值、使用Series或apply方法实现。例如,直接赋值可将列表或Series对象分配给新列;使用Series可基于现有列计算生成新列;apply方法则允许应用自定义函数到每一行或列来创建新列。
89 8