开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:高级特性_累加器】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/12031
高级特性_累加器
内容简介
一、出现的问题
二、全局累加器
三、自定义累加器
闭包相关的内容,全局累加器,大致有三个步骤,了解问题所在,代码怎么写,ui 呈现出来的样子。
通过本节的学习,就可以了解什么是全局累加器,怎样解决在全局、分布式环境下去累计变量。
一、出现的问题
Var count = 0
Val confing = new sparkconf().setAPPname(“job_ana”).setmaster(“local[6]”)
Val sc = new sparkcontext(config)
sc.parallelize(seq(1,2,3,4,5))
.foreach(count +_)
// parallelize 创建了一个新的 RDD ,有五个元素,接下来 foreach,foreach 一次,累计 count
// count 计数
假设三个 executor
Task 会被分发到不同的 executor 中执行,默认 count 值为0,
Executor1中的task 在计算时,值用的为 count=0,executor2也运用的初始值 count=0,executor3同上。
问题是它们会分别使用初始值等于0进行计算,假如第一个 executor1上运行了一个 task1 ,这个task 1里面只有一个1和2;executor2里面运行的一个 task2,其里面有3,4;executor 3里有一个4,5。
那么结果分别是第一个 count=2;第二个 count=2;第三个 count=1,这里明显是错误的,会割裂出来,布式式每一个节点都有一个 count。会把结果序列化再反序列化出来,结果是这段代码不能正常工作
二、全局累加器
1.Accumulators(累加器)是一个只支持 added(添加)的分布式变量,可以在分布式环境下保持一致性,并能够做到高效的并发
代码,可以再分布式上保证累加是没有问题的
//分布式变量和普通变量的区别是 count 是没有办法在分布式环境下保持一致的,而 accumulators 在分布式环境下也可以保持一致,不论哪个 executor,都能正常累加,不会出现线程安全的继承问题
进入小窗口中,方便查看 UI
Scala val counter = sc.longAccumulator(“counter”)
Counter:org.apache.spark.until.longAccumlator=longAccu
ulator(id:0,name:samea(counter),value:0)
Scala> val result =
sc.parallelize(seq(1,2,3,4,5,)).foreach(counter.add(_))
Result:until = ()
Scala>counter.value
Res0:long = 15
Scala>
// longccumulator 创建出来后,可以在结论中做相应的累加,累
加完后得出最后正确的结果
// 创建出 sc.longAccumulator,可以在所有节点中进行相应的累
加
2.Counter 的使用分三个步骤
(1)可以在任意地方,通过 longaccumulator 创建一个新的 accumulator
(2)在算子中使用 longaccumulator 进行相应的累加
(3)执行完后,可以通过 value 得出相应的值
3.进入浏览器,打开4040端口
进入 job
值是15
自定义 accumulator
Longaccumulator(“counter”) 只能对数进行累加,如果像累加字符串、变量、对象,这里就需要用到自定义累加器
三、自定义累加器
1.打开 idea,创建一个新的类 accumulator
Import org.apache.spark.until.accumulatorv2
Import org.junit test
Import scala.collection.mutable
Class accumulator {
/**
*RDD -(1,2,3,4,5)-set(1,2,3,4,5)
*/
@test
Def acc():unit ={
自定义类
Class numaccumulator extends accumlatorv2[string] =mutable.set[string]] {
Private val nums: mutable set[string] = set() // mutable 可变量
/**
* 告诉 spark 框架,这个给累加器对象是否是空的
*/
Override def iszero:boolean = {
Nums.isempty
}
/**
* 提供给 spark 框架一个拷贝的累加器
*@return
/*
Override def copy():accumulatorv2[string,set[string]] = {
Val newaccumulator = new numaccumlator() // 可以使用对象 nums 进行同步
Nums:synchronized {
Newaccumulator.nums ++= this.nums // 空的 nums,加入上述 nums 的内容
}
Newaccumulator 返回
/** 帮助 spark 框架,清理累加器的内容
*/
Override def reset():unit = {
Nums.clear()
}
/**
* 外部传入要累加的内容,在这个方法中进行累加
*/
Override def add(v:string):unit = {
Nums += v
}
/**
* 累加器在进行累加的时候,可能每个分布式节点都有一个实例
*在最后 diver 进行一次合并,把所有的示例的内容合并起来,会调用这个 merge 的方法进行合并
*/
Override def merge(other:accumulatorv2[string,set[string]]):unit = {
Nums ++= other.nums.value
}
/**
* 提供给外部累加结果
*为什么一定要给不可变的,因为外部有可能在进行修改,如果是可变的集合,其外部的修改会影响内部的值
*/
//将 copy 累加器复制起来,进行合并
重写 value 方法
Override def value:set[string} = {
Nums.toset //不可变
使用新的类,继承 accumulatorv2,把方法实现
2.示例
@test
Def acc(): unit = {
Val config = new sparkconf().setappname(“acc”).setmaster(“local[6]”)
Val sc = new sparkcontext(config)
Val numacc = new nemaccumulator() //创建累加器
// 注册给 spark
Sc.register(numacc,name =”num”)
Sc.parallelize(seq(“1”,”2”,”3”))
.foreach(item =>numacc.add(item))
Println(numacc.value)
接下来使用 Sc.stop()关闭,这样自定义累加器就实现了。
运行结果没有问题。



