高级特性_累加器 | 学习笔记

简介: 快速学习 高级特性_累加器

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段高级特性_累加器】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/12031


高级特性_累加器


内容简介

一、出现的问题

二、全局累加器

三、自定义累加器

 

闭包相关的内容,全局累加器,大致有三个步骤,了解问题所在,代码怎么写,ui 呈现出来的样子。

通过本节的学习,就可以了解什么是全局累加器,怎样解决在全局、分布式环境下去累计变量。

 

一、出现的问题

Var count = 0

Val confing = new sparkconf().setAPPname(“job_ana”).setmaster(“local[6]”)

Val sc = new sparkcontextconfig

sc.parallelizeseq12345))

.foreachcount +_)

// parallelize 创建了一个新的 RDD ,有五个元素,接下来 foreachforeach 一次,累计 count

// count 计数

假设三个 executor

image.png

Task 会被分发到不同的 executor 中执行,默认 count 值为0

Executor1中的task 在计算时,值用的为 count=0executor2也运用的初始值 count=0executor3同上。

问题是它们会分别使用初始值等于0进行计算,假如第一个 executor1上运行了一个 task1 ,这个task 1里面只有一个12executor2里面运行的一个 task2,其里面有34executor 3里有一个45

那么结果分别是第一个 count=2;第二个 count=2;第三个 count=1,这里明显是错误的,会割裂出来,布式式每一个节点都有一个 count。会把结果序列化再反序列化出来,结果是这段代码不能正常工作

 

二、全局累加器

1.Accumulators(累加器)是一个只支持 added(添加)的分布式变量,可以在分布式环境下保持一致性,并能够做到高效的并发

代码,可以再分布式上保证累加是没有问题的

//分布式变量和普通变量的区别是 count 是没有办法在分布式环境下保持一致的,而 accumulators 在分布式环境下也可以保持一致,不论哪个 executor,都能正常累加,不会出现线程安全的继承问题

进入小窗口中,方便查看 UI

Scala val counter = sc.longAccumulator(counter)

Counter:org.apache.spark.until.longAccumlator=longAccu

ulator(id:0,name:samea(counter),value:0)

Scala> val result =

sc.parallelize(seq(1,2,3,4,5,)).foreach(counter.add(_))

Result:until = ()

Scala>counter.value

Res0:long = 15

Scala>

// longccumulator 创建出来后,可以在结论中做相应的累加,累

加完后得出最后正确的结果

// 创建出 sc.longAccumulator,可以在所有节点中进行相应的累

2.Counter 的使用分三个步骤

1)可以在任意地方,通过 longaccumulator 创建一个新的 accumulator

2)在算子中使用 longaccumulator 进行相应的累加

3)执行完后,可以通过 value 得出相应的值

3.进入浏览器,打开4040端口

image.png

进入 job

image.png

值是15

自定义 accumulator

Longaccumulator(“counter) 只能对数进行累加,如果像累加字符串、变量、对象,这里就需要用到自定义累加器


三、自定义累加器

1.打开 idea,创建一个新的类 accumulator

Import org.apache.spark.until.accumulatorv2

Import org.junit test

Import scala.collection.mutable

Class accumulator {

/**

*RDD -(1,2,3,4,5)-set(1,2,3,4,5)

*/

@test

Def acc():unit ={

自定义类

Class numaccumulator extends accumlatorv2[string] =mutable.set[string]] {

Private val nums: mutable set[string] = set() // mutable 可变量

/**

* 告诉 spark 框架,这个给累加器对象是否是空的

*/

Override def iszero:boolean = {

Nums.isempty

}

/**

* 提供给 spark 框架一个拷贝的累加器

*@return

/*

Override def copy():accumulatorv2[string,set[string]] = {

Val newaccumulator = new numaccumlator()   // 可以使用对象 nums 进行同步

Numssynchronized {

Newaccumulator.nums ++= this.nums    // 空的 nums,加入上述 nums 的内容

}

Newaccumulator 返回

/** 帮助 spark 框架,清理累加器的内容

*/

Override def reset():unit = {

Nums.clear()

}

/**

* 外部传入要累加的内容,在这个方法中进行累加

*/

Override def add(v:string):unit = {

Nums += v

}

/**

* 累加器在进行累加的时候,可能每个分布式节点都有一个实例

*在最后 diver 进行一次合并,把所有的示例的内容合并起来,会调用这个 merge 的方法进行合并

*/

Override def mergeotheraccumulatorv2[string,set[string]]):unit = {

Nums ++= other.nums.value

}

/**

* 提供给外部累加结果

*为什么一定要给不可变的,因为外部有可能在进行修改,如果是可变的集合,其外部的修改会影响内部的值

*/

// copy 累加器复制起来,进行合并

重写 value 方法

Override def valueset[string} = {

Nums.toset  //不可变

使用新的类,继承 accumulatorv2,把方法实现

2.示例

@test

Def acc(): unit = {

Val config = new sparkconf().setappname(acc).setmaster(local[6])

Val sc = new sparkcontext(config)

Val numacc = new nemaccumulator()  //创建累加器

// 注册给 spark

Sc.register(numacc,name =num)

Sc.parallelize(seq(1,2,3))

.foreach(item =>numacc.add(item))

Println(numacc.value)

接下来使用 Sc.stop()关闭,这样自定义累加器就实现了。

运行结果没有问题。

image.png

相关文章
CSDN博客如何添加微信公众号二维码
CSDN博客如何添加微信公众号二维码
500 0
|
10月前
|
存储 传感器 编解码
ROS机器视觉入门:从基础到人脸识别与目标检测
前言 从本文开始,我们将开始学习ROS机器视觉处理,刚开始先学习一部分外围的知识,为后续的人脸识别、目标跟踪和YOLOV5目标检测做准备工作。我采用的笔记本是联想拯救者游戏本,系统采用Ubuntu20.04,ROS采用noetic。 颜色编码格式,图像格式和视频压缩格式 (1)RGB和BGR:这是两种常见的颜色编码格式,分别代表了红、绿、蓝三原色。不同之处在于,RGB按照红、绿、蓝的顺序存储颜色信息,而BGR按照蓝、绿、红的顺序存储。 rgb8图像格式:常用于显示系统,如电视和计算机屏幕。 RGB值以8 bits表示每种颜色,总共可以表示256×256×256=16777216种颜色
407 70
|
安全 开发工具 数据安全/隐私保护
代码管理记录(一): 码云Gitee代码提交和维护
本文介绍了Gitee平台,提供了代码托管服务,并详细说明了从新建仓库到代码提交的步骤。
355 1
代码管理记录(一): 码云Gitee代码提交和维护
|
Ubuntu 网络协议 Linux
【附安装包】CentOS7(Linux)详细安装教程(手把手图文详解版)
【附安装包】CentOS7(Linux)详细安装教程(手把手图文详解版)
5122 2
|
存储 SQL 关系型数据库
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案
MySQL如何进行分库分表、数据迁移?从相关概念、使用场景、拆分方式、分表字段选择、数据一致性校验等角度阐述MySQL数据库的分库分表方案。
1641 15
一篇文章搞懂MySQL的分库分表,从拆分场景、目标评估、拆分方案、不停机迁移、一致性补偿等方面详细阐述MySQL数据库的分库分表方案
|
机器学习/深度学习 人工智能 前端开发
AI计算机视觉笔记三:WEB端部署YOLOv5
本文档介绍了如何将YOLOv5目标检测模型部署到Web端的方法,包括基于Flask和Streamlit两种实现方案。首先创建Python虚拟环境并安装必要的依赖库。接着详细展示了Flask方案下的前端HTML页面与后端Python逻辑代码,该方案利用Flask框架搭建服务器,处理实时视频流,并显示检测结果。随后介绍了Streamlit方案,该方案更简洁直观,适合快速开发交互式的机器学习应用。通过`streamlit run`命令即可启动应用,支持图像、视频及实时摄像头的目标检测演示。两种部署方式各有优势,Flask灵活性高,适用于复杂项目;而Streamlit则易于上手,便于快速原型设计。
1515 0
|
数据采集 机器学习/深度学习 数据可视化
数据挖掘实战:Python在金融数据分析中的应用案例
Python在金融数据分析中扮演关键角色,用于预测市场趋势和风险管理。本文通过案例展示了使用Python库(如pandas、numpy、matplotlib等)进行数据获取、清洗、分析和建立预测模型,例如计算苹果公司(AAPL)股票的简单移动平均线,以展示基本流程。此示例为更复杂的金融建模奠定了基础。【6月更文挑战第13天】
2776 3
|
存储 缓存 运维
带你读《云原生架构白皮书2022新版》——主要架构模式(上)
带你读《云原生架构白皮书2022新版》——主要架构模式(上)
1286 89
|
存储 关系型数据库 MySQL
MySQL删除外键、增加外键及删除主键、增加主键
MySQL删除外键、增加外键及删除主键、增加主键
217 0
嵌入式 IIC(I2C)协议
嵌入式 IIC(I2C)协议