row_number()开窗函数: 其实就是给每个分组的数据,按照其排序的顺序,打上一个分组内的行号,相当于groupTopN,在实际应用中非常广泛。
+--------+-------+------+ |deptName| name|salary| +--------+-------+------+ | dept-1|Michael| 3000| | dept-2| Andy| 5000| | dept-1| Alex| 4500| | dept-2| Justin| 6700| | dept-2| Cherry| 3400| | dept-1| Jack| 5500| | dept-2| Jone| 12000| | dept-1| Lucy| 8000| | dept-2| LiLi| 7600| | dept-2| Pony| 4200| +--------+-------+------+
需求分析:对上面数据表按照deptName分组,并按照salary降序排序,取出每个deptName组前两名。
数据源:
{"deptName":"dept-1", "name":"Michael", "salary":3000} {"deptName":"dept-2", "name":"Andy", "salary":5000} {"deptName":"dept-1", "name":"Alex", "salary":4500} {"deptName":"dept-2", "name":"Justin", "salary":6700} {"deptName":"dept-2", "name":"Cherry", "salary":3400} {"deptName":"dept-1", "name":"Jack", "salary":5500} {"deptName":"dept-2", "name":"Jone", "salary":12000} {"deptName":"dept-1", "name":"Lucy", "salary":8000} {"deptName":"dept-2", "name":"LiLi", "salary":7600} {"deptName":"dept-2", "name":"Pony", "salary":4200}
初始化SparkSession
package com.kfk.spark.common import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:02 下午 */ object CommSparkSessionScala { def getSparkSession(): SparkSession ={ val spark = SparkSession .builder .appName("CommSparkSessionScala") .master("local") .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse") .getOrCreate return spark } }
实现开窗函数
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/8 * @time : 12:22 下午 */ object WindowFunctionScala { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() val userPath = Comm.fileDirPath + "users.json" spark.read.json(userPath).show() /** * +--------+-------+------+ * |deptName| name|salary| * +--------+-------+------+ * | dept-1|Michael| 3000| * | dept-2| Andy| 5000| * | dept-1| Alex| 4500| * | dept-2| Justin| 6700| * | dept-2| Cherry| 3400| * | dept-1| Jack| 5500| * | dept-2| Jone| 12000| * | dept-1| Lucy| 8000| * | dept-2| LiLi| 7600| * | dept-2| Pony| 4200| * +--------+-------+------+ */ spark.read.json(userPath).createOrReplaceTempView("user") // 实现开窗函数:所谓开窗函数就是分组求TopN spark.sql("select deptName,name,salary,rank from" + "(select deptName,name,salary,row_number() OVER (PARTITION BY deptName order by salary desc) rank from user) tempUser " + "where rank <=2").show() /** * +--------+----+------+----+ * |deptName|name|salary|rank| * +--------+----+------+----+ * | dept-1|Lucy| 8000| 1| * | dept-1|Jack| 5500| 2| * | dept-2|Jone| 12000| 1| * | dept-2|LiLi| 7600| 2| * +--------+----+------+----+ */ // 实现分组排序 spark.sql("select * from user order by deptName,salary desc").show() /** * +--------+-------+------+ * |deptName| name|salary| * +--------+-------+------+ * | dept-1| Lucy| 8000| * | dept-1| Jack| 5500| * | dept-1| Alex| 4500| * | dept-1|Michael| 3000| * | dept-2| Jone| 12000| * | dept-2| LiLi| 7600| * | dept-2| Justin| 6700| * | dept-2| Andy| 5000| * | dept-2| Pony| 4200| * | dept-2| Cherry| 3400| * +--------+-------+------+ */ } }