4.4 聚合函数
在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就
是其中聚合函数的使用。
4.4.1 集合中聚合函数
回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列
表List中聚合函数reduce和fold源码如下:
通过代码,看看列表List中聚合函数使用:
运行截图如下所示:
fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:
聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:
4.4.2 RDD 中聚合函数
在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:
案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:
运行原理分析:
使用RDD中fold聚合函数:
查看RDD中高级聚合函数aggregate,函数声明如下:
业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:
核心业务代码如下:
运行结果原理剖析示意图:
上述完整范例演示代码:
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext, TaskContext} import scala.collection.mutable import scala.collection.mutable.ListBuffer /** * RDD中聚合函数:reduce、aggregate函数 */ object SparkAggTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // 模拟数据,1 到 10 的列表,通过并行方式创建RDD val datas = 1 to 10 val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2) // 查看每个分区中的数据 datasRDD.foreachPartition { iter => println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}") } println("=========================================") // 使用reduce函数聚合 val result: Int = datasRDD.reduce((tmp, item) => { println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item") tmp + item }) println(result) println("=========================================") // 使用fold函数聚合 val result2: Int = datasRDD.fold(0)((tmp, item) => { println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item") tmp + item }) println(result2) println("=========================================") // 使用aggregate函数获取最大的两个值 val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())( // 分区内聚合函数,每个分区内数据如何聚合 seqOp: (U, T) => U, (u, t) => { println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t") // 将元素加入到列表中 u += t // // 降序排序 val top = u.sorted.takeRight(2) // 返回 top }, // 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U (u1, u2) => { println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2") u1 ++= u2 // 将列表的数据合并,到u1中 // u1.sorted.takeRight(2) } ) println(top2) // 应用程序运行结束,关闭资源 sc.stop() } }
4.4.3 PairRDDFunctions 聚合函数
在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数
据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函
数:将相同Key的Value进行聚合操作的,省去先分组再聚合。
- 第一类:分组函数groupByKey
- 第二类:分组聚合函数reduceByKey和foldByKey
但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。 - 第三类:分组聚合函数aggregateByKey
在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,
基本上都能完成任意聚合功能。
演示范例代码如下:
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * RDD中聚合函数,针对RDD中数据类型Key/Value对: * groupByKey * reduceByKey/foldByKey * aggregateByKey * combineByKey */ object SparkAggByKeyTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // 1、并行化集合创建RDD数据集 val linesSeq: Seq[String] = Seq( "hadoop scala hive spark scala sql sql", // "hadoop scala spark hdfs hive spark", // "spark hdfs spark hdfs scala hive spark" // ) val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2) // 2、分割单词,转换为二元组 val wordsRDD: RDD[(String, Int)] = inputRDD .flatMap(line => line.split("\\s+")) .map(word => word -> 1) // TODO: 先使用groupByKey函数分组,再使用map函数聚合 val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey() val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map { case (word, values) => val count: Int = values.sum word -> count } println(resultRDD.collectAsMap()) // TODO: 直接使用reduceByKey或foldByKey分组聚合 val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item) println(resultRDD2.collectAsMap()) val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item) println(resultRDD3.collectAsMap()) // TODO: 使用aggregateByKey聚合 /* def aggregateByKey[U: ClassTag] (zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue ( seqOp: (U, V) => U, // 各个分区内数据聚合操作函数 combOp: (U, U) => U // 分区间聚合结果的聚合操作函数 ): RDD[(K, U)] */ val resultRDD4 = wordsRDD.aggregateByKey(0)( (tmp: Int, item: Int) => { tmp + item }, (tmp: Int, result: Int) => { tmp + result } ) println(resultRDD4.collectAsMap()) // 应用程序运行结束,关闭资源 Thread.sleep(1000000) sc.stop() } }
4.4.4 面试题
RDD中groupByKey和reduceByKey区别???
- reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,
- 将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
- groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同
key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。
4.5 关联函数
当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。
首先回顾一下SQL JOIN,用Venn图表示如下:
RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:
具体看一下join(等值连接)函数说明:
范例演示代码:
import org.apache.sp import org.apache.spark.{SparkConf, SparkContext} /** * RDD中关联函数Join,针对RDD中数据类型为Key/Value对 * */ object SparkJoinTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // 模拟数据集 val empRDD: RDD[(Int, String)] = sc.parallelize( Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")) ) val deptRDD: RDD[(Int, String)] = sc.parallelize( Seq((1001, "sales"), (1002, "tech")) ) /* def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] */ val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD) println(joinRDD.collectAsMap()) /* def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] */ val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD) println(leftJoinRDD.collectAsMap()) // 应用程序运行结束,关闭资源 sc.stop() } }
5 函数练习
RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常
用的函数使用,更多函数在实际中使用体会,多加练习理解。
5.1 map 函数
对RDD中的每一个元素进行操作并返回操作的结果。
5.2 filter 函数
函数中返回True的被留下,返回False的被过滤掉。
5.3 flatMap 函数
对RDD中的每一个元素进行先map再压扁,最后返回操作的结果。
5.4 交集、并集、差集、笛卡尔积
数学集合中操作,类似Scala集合类Set中相关函数,注意类型要一致。
5.5 distinct 函数
对RDD中元素进行去重,与Scala集合中distinct类似。
5.6 first、take、top 函数
从RDD中获取某些元素,比如first为第一个元素,take为前N个元素,top为最大的N个元素。
5.7 keys、values 函数
针对RDD中数据类型为KeyValue对时,获取所有key和value的值,类似Scala中Map集合。
5.8 mapValues 函数
mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后。
5.9 collectAsMap 函数
当RDD中数据类型为Key/Value对时,转换为Map集合。
5.10 mapPartitionsWithIndex 函数
取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的。