开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第三阶段:函数_窗口 1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12103
函数_窗口 1
内容介绍:
一、窗口函数
1.数据集、
2.需求分析
3.代码编写
一、窗口函数
1.数据集:
productRevenue
product |
category |
revenue |
Thin |
Cell phone |
6000 |
Normal |
Tablet |
1500 |
Mini |
Tablet |
5500 |
UItra thin |
Cell phone |
5000 |
Very thin |
cell phone |
6000 |
Big |
Tablet |
2500 |
Bendable |
Cell phone |
3000 |
Foldable |
Cell phone |
3000 |
Pro |
Tablet |
4500 |
Pro2 |
Tablet |
6500 |
product :商品名称 categroy :类别 revenue :收入
2.需求分析
需求:从数据集中得到每个类别收入第一的商品和收入第二的商品
关键点是,每个类别,收入前两名
product |
category |
revenue |
Pro2 |
Tablet |
6500 |
Mini |
Tablet |
5500 |
Thin |
Cell phone |
6000 |
Very thin |
cell phone |
6000 |
UItra thin |
Cell phone |
5000 |
方案 1:使用常见语法子查询
问题 1: Spark 和 Hive 这样的系统中,有自增主键吗?没有
问题 2:为什么分布式系统中很少见自增主键?因为分布式环境下数据在不同的节点中,很难保证顺序。解决方案:按照某一列去排序,取前两条数据
遗留问题:不容易在分组中取每一组的前两个
SELECT * FROM productRevenue ORDER BY revenue LIMIT 2
Limit 只能作用于整个数据集,不能作用于某一个分组。
方案 2:计算每一个类别的按照收入排序的序号,取每个类别中的前两个
productRevenue
product |
category |
revenue |
Thin |
Cell phone |
6000 |
Normal |
Tablet |
1500 |
Mini |
Tablet |
5500 |
UItra thin |
Cell phone |
5000 |
Very thin |
cell phone |
6000 |
Big |
Tablet |
2500 |
Bendable |
Cell phone |
3000 |
Foldable |
Cell phone |
3000 |
Pro |
Tablet |
4500 |
Pro2 |
Tablet |
6500 |
思路步骤:
①按照类别分组
②每个类别中的数据按照收入排序
③为排序过的数据增加编号
④取得每个类别中的前两个数据作为最终结果
使用 SQL 不太容易做到,需要一个语法,叫做窗口函数.
3.代码编写:
(1)创建初始环境
①创建新的类 windowFunction
②编写测试方法
③初始化 SparkSession
④创建数据集
class windowFunction {
@Test
def firstSecond() : unit = {
val spark = SparkSession.builder(
.appName ( "window" )
.master( "local[6]")
.getorcreate()
import spark.implicits._
val data = Seq(
("Thin" , "cell phone" , 6000 ) ,
("Normal" , "Tablet", 1500) ,
("Mini" , "Tablet", 5500) ,
("Ultra thin" , "Cell phone" , 5000) ,
("very thin" , "cell phone" , 6000) ,
("Big","Tablet",2500 ) ,
("Bendable" , "cell phone" .30oo) ,
( "Foldable" . "cell phone" , 3000) ,
( "Pro","Tablet", 4500) ,
).toDF("product", "category" , "revenue")
在 spark 当中,窗口函数的使用步骤:
//1.定义窗口
val window = window.partitionBy( cols = "category)
找到 org.apache.spark.sqLexpressions.Window
.orderBy ( ' revenue.desc)
注:需要找价格最高的,降序
//2.数据处理
import org.apache.spark.sql.functions._
注:如果要使用 dense,需要导入
org.apache.spark.sql.functions._
这个函数本质是进行排序,是在 window 中进行排序。
source.select( cols = ' product,'category, dense_rank() over window as “rank”)
注:分组函数定义完成之后要生成别名。
.where( condition = 'rank <= 2)
.show
}
}
运行代码得到结果:
窗口函数应用的技巧:先定义窗口,在窗口之上应用某一个函数为其作一些特定功能。
除了这种方式,还有其他方式完成窗口,如以下的 sql 语句:
source.createOrReplaceTempView( viewName = "productRevenue" )
spark.sq1( sqlText = "select product,category,revenue from "+
(select *, dense_rank() over (partition by category order by revenue desc) from productRevenue) "+
" where rank <= 2
注:第一个 from 内部要进行子查询,子查询的数据集中应该生成 dense_rank 列
.show()
不仅可以使用命令式的 API 完成,还能使用 Sql 语句完成,相对来说,sql 语句更为简易。
点击运行,但是出现报错,因为在代码中没有给出 rank,所以:
source.createOrReplaceTempView( viewName = "productRevenue" )
spark.sq1( sqlText = "select product,category,revenue from "+
(select *, dense_rank() over (partition by category order by revenue desc) as rank from productRevenue) "+
" where rank <= 2
.show()
再次运行:
由图可知:结果没有任何问题,两个类别的第一第二都呈现出来了。