Spark SQL中开窗函数详解

简介: 笔记

row_number()开窗函数: 其实就是给每个分组的数据,按照其排序的顺序,打上一个分组内的行号,相当于groupTopN,在实际应用中非常广泛。

+--------+-------+------+
|deptName|   name|salary|
+--------+-------+------+
|  dept-1|Michael|  3000|
|  dept-2|   Andy|  5000|
|  dept-1|   Alex|  4500|
|  dept-2| Justin|  6700|
|  dept-2| Cherry|  3400|
|  dept-1|   Jack|  5500|
|  dept-2|   Jone| 12000|
|  dept-1|   Lucy|  8000|
|  dept-2|   LiLi|  7600|
|  dept-2|   Pony|  4200|
+--------+-------+------+

需求分析:对上面数据表按照deptName分组,并按照salary降序排序,取出每个deptName组前两名。

数据源:

{"deptName":"dept-1", "name":"Michael", "salary":3000}
{"deptName":"dept-2", "name":"Andy", "salary":5000}
{"deptName":"dept-1", "name":"Alex", "salary":4500}
{"deptName":"dept-2", "name":"Justin", "salary":6700}
{"deptName":"dept-2", "name":"Cherry", "salary":3400}
{"deptName":"dept-1", "name":"Jack", "salary":5500}
{"deptName":"dept-2", "name":"Jone", "salary":12000}
{"deptName":"dept-1", "name":"Lucy", "salary":8000}
{"deptName":"dept-2", "name":"LiLi", "salary":7600}
{"deptName":"dept-2", "name":"Pony", "salary":4200}

初始化SparkSession

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

实现开窗函数

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 12:22 下午
 */
object WindowFunctionScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        val userPath = Comm.fileDirPath + "users.json"
        spark.read.json(userPath).show()
        /**
         * +--------+-------+------+
         * |deptName|   name|salary|
         * +--------+-------+------+
         * |  dept-1|Michael|  3000|
         * |  dept-2|   Andy|  5000|
         * |  dept-1|   Alex|  4500|
         * |  dept-2| Justin|  6700|
         * |  dept-2| Cherry|  3400|
         * |  dept-1|   Jack|  5500|
         * |  dept-2|   Jone| 12000|
         * |  dept-1|   Lucy|  8000|
         * |  dept-2|   LiLi|  7600|
         * |  dept-2|   Pony|  4200|
         * +--------+-------+------+
         */
        spark.read.json(userPath).createOrReplaceTempView("user")
        // 实现开窗函数:所谓开窗函数就是分组求TopN
        spark.sql("select deptName,name,salary,rank from" +
                "(select deptName,name,salary,row_number() OVER (PARTITION BY deptName order by salary desc) rank from user) tempUser " +
                "where rank <=2").show()
        /**
         * +--------+----+------+----+
         * |deptName|name|salary|rank|
         * +--------+----+------+----+
         * |  dept-1|Lucy|  8000|   1|
         * |  dept-1|Jack|  5500|   2|
         * |  dept-2|Jone| 12000|   1|
         * |  dept-2|LiLi|  7600|   2|
         * +--------+----+------+----+
         */
        // 实现分组排序
        spark.sql("select * from user order by deptName,salary desc").show()
        /**
         * +--------+-------+------+
         * |deptName|   name|salary|
         * +--------+-------+------+
         * |  dept-1|   Lucy|  8000|
         * |  dept-1|   Jack|  5500|
         * |  dept-1|   Alex|  4500|
         * |  dept-1|Michael|  3000|
         * |  dept-2|   Jone| 12000|
         * |  dept-2|   LiLi|  7600|
         * |  dept-2| Justin|  6700|
         * |  dept-2|   Andy|  5000|
         * |  dept-2|   Pony|  4200|
         * |  dept-2| Cherry|  3400|
         * +--------+-------+------+
         */
    }
}


相关文章
|
1月前
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
|
15天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
1月前
|
SQL 数据库 索引
SQL中COUNT函数结合条件使用的技巧与方法
在SQL查询中,COUNT函数是一个非常常用的聚合函数,用于计算表中满足特定条件的记录数
|
1月前
|
SQL 关系型数据库 MySQL
SQL日期函数
SQL日期函数
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
47 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
81 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
SQL 存储 数据库
SQL Server函数与存储过程 计算时间
SQL Server函数与存储过程 计算时间 一、通过一个开始时间、结束时间计算出一个工作日天数(不包含工作日与节假日);   1、函数 --创建函数,参数 @bengrq 开始时间,@endrq 结束时间 create function [dbo].
1786 0
|
SQL 存储 Perl
PL/SQL函数和存储过程
前言 活到老,学到老。 基本概念 --ORACLE 提供可以把PL/SQL 程序存储在数据库中,并可以在任何地方来运行它。这样就叫存储过程或函数。过程和函数统称为PL/SQL子程序,他们是被命名的PL/SQL块,均存储在数据库中,并通过输入、输出参数或输入/输出参数与其调用者交换信息。
1418 0

热门文章

最新文章

下一篇
无影云桌面