Spark SQL自定义函数 2

简介: Spark SQL自定义函数

2 开窗函数

2.1、概述

●介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

●聚合函数和开窗函数

聚合函数是将多行变成一行,count,avg…

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来


●开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

==2.排序开窗函数 ==

排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

2.2.准备工作

/export/servers/spark/bin/spark-shell

case class Score(name: String, clazz: Int, score: Int)
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.show()

查看数据结果:

+----+-----+-----+
|name|class|score|
+----+-----+-----+
|  a1|    1|   80|
|  a2|    1|   78|
|  a3|    1|   95|
|  a4|    2|   74|
|  a5|    2|   92|
|  a6|    3|   99|
|  a7|    3|   99|
|  a8|    3|   45|
|  a9|    3|   55|
| a10|    3|   78|
| a11|    3|  100|
+----+-----+-----+

2.3. 聚合开窗函数

●示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

SQL标准允许将所有聚合函数用做聚合开窗函数。

spark.sql("select  count(name)  from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show

结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|        11|
|  a2|    1|   78|        11|
|  a3|    1|   95|        11|
|  a4|    2|   74|        11|
|  a5|    2|   92|        11|
|  a6|    3|   99|        11|
|  a7|    3|   99|        11|
|  a8|    3|   45|        11|
|  a9|    3|   55|        11|
| a10|    3|   78|        11|
| a11|    3|  100|        11|
+----+-----+-----+----------+

●示例2

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。


开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。


下面的 SQL 语句用于显示按照班级分组后每组的人数:

OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

+----+-----+-----+----------+                                                   
|name|class|score|name_count|
+----+-----+-----+----------+
|  a1|    1|   80|         3|
|  a2|    1|   78|         3|
|  a3|    1|   95|         3|
|  a6|    3|   99|         6|
|  a7|    3|   99|         6|
|  a8|    3|   45|         6|
|  a9|    3|   55|         6|
| a10|    3|   78|         6|
| a11|    3|  100|         6|
|  a4|    2|   74|         2|
|  a5|    2|   92|         2|
+----+-----+-----+----------+

2.4. 排序开窗函数

2.4.1 ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:

在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。

●示例1

只做排序,按成绩排序

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   5|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|  10|
| a11|    3|  100|  11|
+----+-----+-----+----+

分区并排序,按班级分区并按成绩排序

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.2 RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。


这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数


简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名


●示例2


按成绩跳跃排序

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() 

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
| a10|    3|   78|   4|
|  a2|    1|   78|   4|
|  a1|    1|   80|   6|
|  a5|    2|   92|   7|
|  a3|    1|   95|   8|
|  a6|    3|   99|   9|
|  a7|    3|   99|   9|
| a11|    3|  100|  11|
+----+-----+-----+----+

按班级分区,并按成绩排序

spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.3 DENSE_RANK连续排序

dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。


这个函数并列排名之后的排名是并列排名加1


简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名


●示例3

按成绩连续排序

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
|  a4|    2|   74|   3|
|  a2|    1|   78|   4|
| a10|    3|   78|   4|
|  a1|    1|   80|   5|
|  a5|    2|   92|   6|
|  a3|    1|   95|   7|
|  a6|    3|   99|   8|
|  a7|    3|   99|   8|
| a11|    3|  100|   9|
+----+-----+-----+----+

按班级分区,并按照成绩排序

spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   4|
| a11|    3|  100|   5|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+

2.4.4 NTILE分组排名

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

●示例4

按成绩连续排序

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

结果如下所示:

+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
|  a8|    3|   45|   1|
|  a9|    3|   55|   1|
|  a4|    2|   74|   2|
|  a2|    1|   78|   2|
| a10|    3|   78|   3|
|  a1|    1|   80|   3|
|  a5|    2|   92|   4|
|  a3|    1|   95|   4|
|  a6|    3|   99|   5|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
+----+-----+-----+----+

按班级分区,并按照成绩排序

spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

结果如下:

+----+-----+-----+----+                                                         
|name|class|score|rank|
+----+-----+-----+----+
|  a2|    1|   78|   1|
|  a1|    1|   80|   2|
|  a3|    1|   95|   3|
|  a8|    3|   45|   1|
|  a9|    3|   55|   2|
| a10|    3|   78|   3|
|  a6|    3|   99|   4|
|  a7|    3|   99|   5|
| a11|    3|  100|   6|
|  a4|    2|   74|   1|
|  a5|    2|   92|   2|
+----+-----+-----+----+


目录
相关文章
|
4天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
4月前
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
97 0
|
4月前
|
存储 SQL 分布式计算
性能优化:Spark SQL中的谓词下推和列式存储
性能优化:Spark SQL中的谓词下推和列式存储
|
4月前
|
SQL 分布式计算 测试技术
使用UDF扩展Spark SQL
使用UDF扩展Spark SQL
|
4月前
|
SQL 数据采集 分布式计算
Spark SQL中的聚合与窗口函数
Spark SQL中的聚合与窗口函数
|
4月前
|
SQL JSON 分布式计算
Spark SQL简介与基本用法
Spark SQL简介与基本用法
|
4月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
5月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
34 0
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
162 0
|
28天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。