Spark SQL自定义函数 2

简介: Spark SQL自定义函数

2 开窗函数

2.1、概述

●介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

●聚合函数和开窗函数

聚合函数是将多行变成一行,count,avg…

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来


●开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

==2.排序开窗函数 ==

排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

2.2.准备工作

/export/servers/spark/bin/spark-shell

case class Score(name: String, clazz: Int, score: Int)
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.show()

查看数据结果:

+----+-----+-----+
|name|class|score|
+----+-----+-----+
|  a1|    1|   80|
|  a2|    1|   78|
|  a3|    1|   95|
|  a4|    2|   74|
|  a5|    2|   92|
|  a6|    3|   99|
|  a7|    3|   99|
|  a8|    3|   45|
|  a9|    3|   55|
| a10|    3|   78|
| a11|    3|  100|
+----+-----+-----+

2.3. 聚合开窗函数

●示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

SQL标准允许将所有聚合函数用做聚合开窗函数。

spark.sql("select  count(name)  from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show

结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|        11|
|  a2|    1|   78|        11|
|  a3|    1|   95|        11|
|  a4|    2|   74|        11|
|  a5|    2|   92|        11|
|  a6|    3|   99|        11|
|  a7|    3|   99|        11|
|  a8|    3|   45|        11|
|  a9|    3|   55|        11|
| a10|    3|   78|        11|
| a11|    3|  100|        11|
+----+-----+-----+----------+

●示例2

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。


开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。


下面的 SQL 语句用于显示按照班级分组后每组的人数:

OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|         3|
|  a2|    1|   78|         3|
|  a3|    1|   95|         3|
|  a6|    3|   99|         6|
|  a7|    3|   99|         6|
|  a8|    3|   45|         6|
|  a9|    3|   55|         6|
| a10|    3|   78|         6|
| a11|    3|  100|         6|
|  a4|    2|   74|         2|
|  a5|    2|   92|         2|
+----+-----+-----+----------+

2.4. 排序开窗函数

2.4.1 ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:

在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。

●示例1

只做排序,按成绩排序

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   5|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|  10|
| a11|    3|  100|  11|
+----+-----+-----+----+

分区并排序,按班级分区并按成绩排序

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.2 RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。


这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数


简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名


●示例2


按成绩跳跃排序

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() 

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
| a10|    3|   78|   4|
|  a2|    1|   78|   4|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|   9|
| a11|    3|  100|  11|
+----+-----+-----+----+

按班级分区,并按成绩排序

spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.3 DENSE_RANK连续排序

dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。


这个函数并列排名之后的排名是并列排名加1


简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名


●示例3

按成绩连续排序

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   4|
|  a1|    1|   80|   5|
|  a5|    2|   92|   6|
|  a3|    1|   95|   7|
|  a6|    3|   99|   8|
|  a7|    3|   99|   8|
| a11|    3|  100|   9|
+----+-----+-----+----+

按班级分区,并按照成绩排序

spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   5|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.4 NTILE分组排名

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

●示例4

按成绩连续排序

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   1|
|  a4|    2|   74|   2|
|  a2|    1|   78|   2|
| a10|    3|   78|   3|
|  a1|    1|   80|   3|
|  a5|    2|   92|   4|
|  a3|    1|   95|   4|
|  a6|    3|   99|   5|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
+----+-----+-----+----+

按班级分区,并按照成绩排序

spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+


目录
相关文章
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
97 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
113 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
79 0
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
97 0
|
SQL 消息中间件 分布式计算
通过Spark SQL实时归档SLS数据
我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。
2575 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
147 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
82 6