开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第三阶段:函数_窗口 2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12104
函数_窗口 2
内容介绍:
一、窗口函数
二、最优差值案例
一、窗口函数
1.窗口函数的逻辑
窗口函数从逻辑上来讲,窗口函数执行步骤大致可以分为如下几步
dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
窗口就是 dense_rank, over 是作用于哪个窗口,(PARTITION BY category ORDER BY revenue DESC)是窗口的定义,窗口的定义是先 PARTITION ,后 ORDER。
(1)根据 PARTITION BY category ,对数据进行分组
(2)分组后,根据 ORDER BY revenue DESC 对每一组数据进行排序
(3)在每一条数据到达窗口函数时,套入窗口内进行计算
窗口函数,其实是预定义一个规则,某条数据在具体到达时,会根据数据情况进行计算,比如:对 cellphone 数据集已经进行了排序和分区,之后对每条数据进行单独处理,在进行单独处理时,数据会套入分区里进行计算,然后生成结果集。
从语法的角度上讲,窗口函数大致分为两个部分:
dense_rank() 0VER(PARTITION BY category ORDER BY revenue DESC) as rank
·函数部分 dense_rank()
·窗口定义部分 PARTITION BY category ORDER BY revenue DESC
窗口函数和 GroupBy 最大的区别,就是 GroupBy 的聚合对每一个组只有一个结果,而窗口函数可以对每一条数据都有一个结果。窗口函数其实就是根据当前数据,计算其在所在的组中的统计数据。
2.窗口定义部分
dense_rank() oVER(PARTITION BY category ORDER BY revenue DESC) as rank
窗口定义部分有三个阶段,
(1)Partition 定义
控制哪些行会被放在一起,同时这个定义也类似于 Shuffle ,会将同一个分组的数据放在同一台机器中处理
(2)Order 定义
在一个分组内进行排序,因为很多操作,如 rank ,需要进行排序
(3)Frame 定义
释义:
.窗口函数会针对每一个组中的每一条数据进行统计聚合或者 rank,一个组又称为一个 Frame
.分组由两个字段控制,Partition 在整体上进行分组和分区
.而通过 Frame 可以通过当前行来更细粒度的分组控制
将多少数据放入窗口中是通过 Frame 定义
举例:例如公司每月销售额的数据,统计其同比增长率,那就需要把这条数据和前面一条数据进行结合计算了有哪些控制方式?
.Row Frame
通过"行号”来表示,可以定义前一条数据和后一条数据以及当前数据。
. Range Frame
通过某一个列的差值来表示
3.函数部分
dense_rank( ) oVER(PARTITION BY category ORDER BY revenue DESC) as rank
窗口定义完成之后就可以在窗口上作用排名函数、分析函数、聚合函数。
如下是支持的窗口函数:
类型 |
函数 |
解释 |
|||
排名函数 |
Rank |
.排名函数,计算当前数据在其 Frame 中的位置 .如果有重复,则重复项后面的行号会有空挡。就是如果前面排名有两个 1,那下一个排名就是 3. |
|||
Dense_rank |
和 rank 一样,但是结果中没有空挡,如果前面排名都为 1,下一个排名为 2。 |
||||
Row_number |
和 rank 一样,也是排名,但是不同点是即使有重复项,排名依然增长。 |
||||
分析函数 |
First_value |
获取这个组第一条数据 |
|||
last_value |
获取这个组最后一条数据 |
||||
lag |
lag(field,n)获取当前数据的 field 列向前 n 条数据 |
||||
lead |
lead(field,n)获取当前数据的 field 列向后 n 条数据 |
||||
聚合函数 |
* |
所有的 functions 中的聚合函数都支持 |
二、最优差值案例
1.需求介绍
源数据集:
productRevenue
product |
category |
revenue |
Thin |
Cell phone |
6000 |
Normal |
Tablet |
1500 |
Mini |
Tablet |
5500 |
UItra thin |
Cell phone |
5000 |
Very thin |
cell phone |
6000 |
Big |
Tablet |
2500 |
Bendable |
Cell phone |
3000 |
Foldable |
Cell phone |
3000 |
Pro |
Tablet |
4500 |
Pro2 |
Tablet |
6500 |
需求:统计每个商品和此品类最贵商品之间的差值,如 tablet 下坐高值是 6500,最低是 1500,差值就是 5000.
目标数据集:
product |
category |
revenue |
revenue_difference |
Pro2 |
Tablet |
6500 |
0 |
Mini |
Tablet |
5500 |
1000 |
Pro |
Tablet |
4500 |
2000 |
Big |
Tablet |
2500 |
4000 |
Normal |
Tablet |
1500 |
5000 |
Thin |
Cell Phone |
6000 |
0 |
Very thin |
Cell Phone |
6000 |
0 |
UItra thin |
Cell Phone |
5500 |
500 |
Foldable |
Cell Phone |
3000 |
3000 |
Bendable |
Cell Phone |
3000 |
3000 |
2.代码实现:
在 idea 中创建一个新的类,scala class 名称为 WindowFun1
package cn.itcast.spark.sql
object windowFun1 {
def main(args: Array[string]): unit = {
val spark = Sparksession. Builder
.appName( "window" )
.master( "local[6]").getorcreate(
import spark.implicits.
import org.apache.spark.sql.functions._
val data = Seq(
("Thin", "Cell phone" , 6000),
("Normal","Tablet".1500) ,
("Mini", "Tablet", 550o),
("Ultra thin", "Cell phone",5500),
("very thin". "Cell phone" ,6000) ,
("Big".“Tablet". 2500),
("Bendable","Cell phone" .3000) ,
("Foldable" , "cell phone",3000) ,
("Pro","Tablet", 4500) ,
("pro2","Tablet", 6500)
)
val source = data.toDF("product", "category", "revenue" )
//1.定义窗口,按照分类进行倒序进行排列
val window = window.partitionBy( ' category)
注:window 是 expressions 下的 window
.orderBy( 'revenue.desc)
//2.找到最贵的商品价格
val maxPrice: sql.column = max ( " revenue) over window、
注:函数应用的结果是 column 对象,可以直接在 select 中使用 column 对象。
//3.得到结果
source.select( cols = 'product, 'category, 'revenue,(maxPrice - 'revenue) as "revenue_difference")
.show()
}
这个案例要通过 max 求 window,window 是每一组中的最高价格,然后在进行 select 时要将价格相减。
运行得到结果:
结果没有任何问题。
窗口函数不是特别常用,但是要求掌握,窗口函数是比较特殊的一种计算方式,它有一个窗口,大家只要掌握窗口概念就能理解窗口函数。