Spark操作—aggregate、aggregateByKey详解

简介:

aggregate函数

       将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

 

      seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.   seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

实例程序:

scala> val rdd = List(1,2,3,4,5,6,7,8,9)  
rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9) 
rdd.par.aggregate((0,0))(  
(acc,number) => (acc._1 + number, acc._2 + 1),  
(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)  
)  
res0: (Int, Int) = (45,9)  
  
scala> res0._1 / res0._2  
res1: Int = 5  
  
过程大概这样:  
首先,初始值是(0,0),这个值在后面2步会用到。  
然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的T,这里即是List中的元素。所以acc._1 + number,acc._2 + 1的过程如下。  
  
1.  0+1,  0+1  
2.  1+2,  1+1  
3.  3+3,  2+1  
4.  6+4,  3+1  
5.  10+5,  4+1  
6.  15+6,  5+1  
7.  21+7,  6+1  
8.  28+8,  7+1  
9.  36+9,  8+1  
</span><strong><span style="color:#ff0000;">结果即是(45,9)</span></strong><span style="color:#330000;">。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了.

  

aggregateByKey函数:

        对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。

实例程序

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
  
object AggregateByKeyOp {  
  def main(args:Array[String]){  
     val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")  
    val sc: SparkContext = new SparkContext(sparkConf)  
       
     val data=List((1,3),(1,2),(1,4),(2,3))  
     val rdd=sc.parallelize(data, 2)  
       
     //合并不同partition中的值,a,b得数据类型为zeroValue的数据类型  
     def combOp(a:String,b:String):String={  
       println("combOp: "+a+"\t"+b)  
       a+b  
     }  
     //合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型  
      def seqOp(a:String,b:Int):String={  
        println("SeqOp:"+a+"\t"+b)  
        a+b  
      }  
      rdd.foreach(println)  
      //zeroValue:中立值,定义返回value的类型,并参与运算  
      //seqOp:用来在同一个partition中合并值  
      //combOp:用来在不同partiton中合并值  
      val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)  
      sc.stop()  
  }  
 }
 运行结果:
 将数据拆分成两个分区

//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)

//分区一相同key的数据进行合并
seq: 100     3   //(1,3)开始和中立值进行合并  合并结果为 1003
seq: 1003     2   //(1,2)再次合并 结果为 10032

//分区二相同key的数据进行合并
seq: 100     4  //(1,4) 开始和中立值进行合并 1004
seq: 100     3  //(2,3) 开始和中立值进行合并 1003

将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)(2,1003)//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032     1004(1,100321004)

原文来自csdn 博客:http://blog.csdn.net/u013514928 午夜阳光

本文转自  ChinaUnicom110  51CTO博客,原文链接:http://blog.51cto.com/xingyue2011/1952415

相关文章
|
8月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
173 0
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
168 0
|
8月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
330 0
|
8月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
366 1
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
分布式计算 DataWorks MaxCompute
DataWorks操作报错合集之spark操作odps,写入时报错,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
8月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
7月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
下一篇
开通oss服务