深入理解Spark:核心思想与源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

82a324c58bf7664b5b31b58e0614d7eaf6636e8a


大数据技术丛书


深入理解Spark:核心思想与源码分析

耿嘉安 著





图书在版编目(CIP)数据

深入理解Spark:核心思想与源码分析/耿嘉安著. —北京:机械工业出版社,2015.12

(大数据技术丛书)

ISBN 978-7-111-52234-8

I. 深… II.耿… III.数据处理软件 IV. TP274

中国版本图书馆CIP数据核字(2015)第280808号



深入理解Spark:核心思想与源码分析

出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

责任编辑:高婧雅 责任校对:董纪丽

印  刷: 版  次:2016年1月第1版第1次印刷

开  本:186mm×240mm 1/16 印  张:30.25

书  号:ISBN 978-7-111-52234-8 定  价:99.00元

凡购本书,如有缺页、倒页、脱页,由本社发行部调换

客服热线:(010)88379426 88361066 投稿热线:(010)88379604

购书热线:(010)68326294 88379649 68995259 读者信箱:hzit@hzbook.com

版权所有·侵权必究

封底无防伪标均为盗版

本书法律顾问:北京大成律师事务所 韩光/邹晓东 





Preface  前言

为什么写这本书

要回答这个问题,需要从我个人的经历说起。说来惭愧,我第一次接触计算机是在高三。当时跟大家一起去网吧玩CS,跟身边的同学学怎么“玩”。正是通过这种“玩”的过程,让我了解到计算机并没有那么神秘,它也只是台机器,用起来似乎并不比打开电视机费劲多少。高考填志愿的时候,凭着直觉“糊里糊涂”就选择了计算机专业。等到真正学习计算机课程的时候却又发现,它其实很难!

早在2004年,还在学校的我跟很多同学一样,喜欢看Flash,也喜欢谈论Flash甚至做Flash。感觉Flash正如它的名字那样“闪光”。那些年,在学校里,知道Flash的人可要比知道Java的人多得多,这说明当时的Flash十分火热。此外,Oracle也成为关系型数据库里的领军人物,很多人甚至觉得懂Oracle要比懂Flash、Java及其他数据库要厉害得多!

2007年,我刚刚参加工作不久。那时Struts1、Spring、Hibernate几乎可以称为那些用Java作为开发语言的软件公司的三驾马车。很快,Struts2替代了Struts1的地位,让我第一次意识到IT领域的技术更新竟然如此之快!随着很多传统软件公司向互联网公司转型,Hibernate也难以确保其地位,iBATIS诞生了!

2010年,有关Hadoop的技术图书涌入中国,当时很多公司用它只是为了数据统计、数据挖掘或者搜索。一开始,人们对于Hadoop的认识和使用可能相对有限。大约2011年的时候,关于云计算的概念在网上炒得火热,当时依然在做互联网开发的我,对其只是“道听途说”。后来跟同事借了一本有关云计算的书,回家挑着看了一些内容,也没什么收获,怅然若失!20世纪60年代,美国的军用网络作为互联网的雏形,很多内容已经与云计算中的某些说法类似。到20世纪80年代,互联网就已经启用了云计算,如今为什么又要重提这样的概念?这个问题我可能回答不了,还是交给历史吧。

2012年,国内又呈现出大数据热的态势。从国家到媒体、教育、IT等几乎所有领域,人人都在谈大数据。我的亲戚朋友中,无论老师、销售人员,还是工程师们都可以针对大数据谈谈自己的看法。我也找来一些Hadoop的书籍进行学习,希望能在其中探索到大数据的奥妙。

有幸在工作过程中接触到阿里的开放数据处理服务(open data processing service,ODPS),并且基于ODPS与其他小伙伴一起构建阿里的大数据商业解决方案—御膳房。去杭州出差的过程中,有幸认识和仲,跟他学习了阿里的实时多维分析平台—Garuda和实时计算平台—Galaxy的部分知识。和仲推荐我阅读Spark的源码,这样会对实时计算及流式计算有更深入的了解。2015年春节期间,自己初次上网查阅Spark的相关资料学习,开始研究Spark源码。还记得那时只是出于对大数据的热爱,想使自己在这方面的技术能力有所提升。

从阅读Hibernate源码开始,到后来阅读Tomcat、Spring的源码,我也在从学习源码的过程中成长,我对源码阅读也越来越感兴趣。随着对Spark源码阅读的深入,发现很多内容从网上找不到答案,只能自己“硬啃”了。随着自己的积累越来越多,突然有一天发现,我所总结的这些内容好像可以写成一本书了!从闪光(Flash)到火花(Spark),足足有11个年头了。无论是Flash、Java,还是Spring、iBATIS,我一直扮演着一个追随者,我接受这些书籍的洗礼,从未给予。如今我也是Spark的追随者,不同的是,我不再只想简单攫取,还要给予。

最后还想说一下,2016年是我从事IT工作的第10个年头,此书特别作为送给自己的10周年礼物。

本书特色

按照源码分析的习惯设计,从脚本分析到初始化再到核心内容,最后介绍Spark的扩展内容。整个过程遵循由浅入深、由深到广的基本思路。

本书涉及的所有内容都有相应的例子,以便于读者对源码的深入研究。

本书尽可能用图来展示原理,加速读者对内容的掌握。

本书讲解的很多实现及原理都值得借鉴,能帮助读者提升架构设计、程序设计等方面的能力。

本书尽可能保留较多的源码,以便于初学者能够在像地铁、公交这样的地方,也能轻松阅读。

读者对象

源码阅读是一项苦差事,人力和时间成本都很高,尤其是对于Spark陌生或者刚刚开始学习的人来说,难度可想而知。本书尽可能保留源码,使得分析过程不至于产生跳跃感,目的是降低大多数人的学习门槛。如果你是从事IT工作1~3年的新人或者是希望学习Spark核心知识的人,本书非常适合你。如果你已经对Spark有所了解或者已经在使用它,还想进一步提高自己,那么本书更适合你。

如果你是一个开发新手,对Java、Linux等基础知识不是很了解,那么本书可能不太适合你。如果你已经对Spark有深入的研究,本书也许可以作为你的参考资料。

总体说来,本书适合以下人群:

想要使用Spark,但对Spark实现原理不了解,不知道怎么学习的人;

大数据技术爱好者,以及想深入了解Spark技术内部实现细节的人;

有一定Spark使用基础,但是不了解Spark技术内部实现细节的人;

对性能优化和部署方案感兴趣的大型互联网工程师和架构师;

开源代码爱好者。喜欢研究源码的同学可以从本书学到一些阅读源码的方式与方法。

本书不会教你如何开发Spark应用程序,只是用一些经典例子演示。本书简单介绍Hadoop MapReduce、Hadoop YARN、Mesos、Tachyon、ZooKeeper、HDFS、Amazon S3,但不会过多介绍这些框架的使用,因为市场上已经有丰富的这类书籍供读者挑选。本书也不会过多介绍Scala、Java、Shell的语法,读者可以在市场上选择适合自己的书籍阅读。

如何阅读本书

本书分为三大部分(不包括附录):

准备篇(第1~2章),简单介绍了Spark的环境搭建和基本原理,帮助读者了解一些背景知识。

核心设计篇(第3~7章),着重讲解SparkContext的初始化、存储体系、任务提交与执行、计算引擎及部署模式的原理和源码分析。

扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括:SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。

本书最后还添加了几个附录,包括:附录A介绍的Spark中最常用的工具类Utils;附录B是Akka的简介与工具类AkkaUtils的介绍;附录C为Jetty的简介和工具类JettyUtils的介绍;附录D为Metrics库的简介和测量容器MetricRegistry的介绍;附录E演示了Hadoop1.0版本中的word count例子;附录F介绍了工具类CommandUtils的常用方法;附录G是关于Netty的简介和工具类NettyUtils的介绍;附录H列举了笔者编译Spark源码时遇到的问题及解决办法。

为了降低读者阅读理解Spark源码的门槛,本书尽可能保留源码实现,希望读者能够怀着一颗好奇的心,Spark当前很火热,其版本更新也很快,本书以Spark 1.2.3版本为主,有兴趣的读者也可按照本书的方式,阅读Spark的最新源码。

勘误和支持

本书内容很多,限于笔者水平有限,书中内容难免有错误之处。在本书出版后的任何时间,如果你对本书有任何问题或者意见,都可以通过邮箱beliefer@163.com或博客http://www.cnblogs.com/jiaan-geng/联系我,说出你的建议或者想法,希望与大家共同进步。

致谢

感谢苍天,让我生活在这样一个时代,能接触互联网和大数据;感谢父母,这么多年来,在学习、工作及生活上的帮助与支持;感谢妻子在生活中的照顾和谦让。

感谢杨福川和高婧雅给予本书出版的大力支持与帮助。

感谢冰夷老大和王贲老大让我有幸加入阿里,接触大数据应用;感谢和仲对Galaxy和Garuda耐心细致的讲解以及对Spark的推荐;感谢张中在百忙之中给本书写评语;感谢周亮、澄苍、民瞻、石申、清无、少侠、征宇、三步、谢衣、晓五、法星、曦轩、九翎、峰阅、丁卯、阿末、紫丞、海炎、涵康、云飏、孟天、零一、六仙、大知、井凡、隆君、太奇、晨炫、既望、宝升、都灵、鬼厉、归钟、梓撤、昊苍、水村、惜冰、惜陌、元乾等同仁在工作上的支持和帮助。


耿嘉安 于北京




 

Contents  目录

前言

准 备 篇

第1章 环境准备2

1.1 运行环境准备2

1.1.1 安装JDK3

1.1.2 安装Scala3

1.1.3 安装Spark4

1.2 Spark初体验4

1.2.1 运行spark-shell4

1.2.2 执行word count5

1.2.3 剖析spark-shell7

1.3 阅读环境准备11

1.4 Spark源码编译与调试13

1.5 小结17

第2章 Spark设计理念与基本架构18

2.1 初识Spark18

2.1.1 Hadoop MRv1的局限18

2.1.2 Spark使用场景20

2.1.3 Spark的特点20

2.2 Spark基础知识20

2.3 Spark基本设计思想22

2.3.1 Spark模块设计22

2.3.2 Spark模型设计24

2.4 Spark基本架构25

2.5 小结26

核心设计篇

第3章 SparkContext的初始化28

3.1 SparkContext概述28

3.2 创建执行环境SparkEnv30

3.2.1 安全管理器SecurityManager31

3.2.2 基于Akka的分布式消息系统ActorSystem31

3.2.3 map任务输出跟踪器mapOutputTracker32

3.2.4 实例化ShuffleManager34

3.2.5 shuffle线程内存管理器ShuffleMemoryManager34

3.2.6 块传输服务BlockTransferService35

3.2.7 BlockManagerMaster介绍35

3.2.8 创建块管理器BlockManager36

3.2.9 创建广播管理器Broadcast-Manager36

3.2.10 创建缓存管理器CacheManager37

3.2.11 HTTP文件服务器HttpFile-Server37

3.2.12 创建测量系统MetricsSystem39

3.2.13 创建SparkEnv40

3.3 创建metadataCleaner41

3.4 SparkUI详解42

3.4.1 listenerBus详解43

3.4.2 构造JobProgressListener46

3.4.3 SparkUI的创建与初始化47

3.4.4 Spark UI的页面布局与展示49

3.4.5 SparkUI的启动54

3.5 Hadoop相关配置及Executor环境变量54

3.5.1 Hadoop相关配置信息54

3.5.2 Executor环境变量54

3.6 创建任务调度器TaskScheduler55

3.6.1 创建TaskSchedulerImpl55

3.6.2 TaskSchedulerImpl的初始化57

3.7 创建和启动DAGScheduler57

3.8 TaskScheduler的启动60

3.8.1 创建LocalActor60

3.8.2 ExecutorSource的创建与注册62

3.8.3 ExecutorActor的构建与注册64

3.8.4 Spark自身ClassLoader的创建64

3.8.5 启动Executor的心跳线程66

3.9 启动测量系统MetricsSystem69

3.9.1 注册Sources70

3.9.2 注册Sinks70

3.9.3 给Sinks增加Jetty的Servlet-ContextHandler71

3.10 创建和启动ExecutorAllocation-Manager72

3.11 ContextCleaner的创建与启动73

3.12 Spark环境更新74

3.13 创建DAGSchedulerSource和BlockManagerSource76

3.14 将SparkContext标记为激活77

3.15 小结78

第4章 存储体系79

4.1 存储体系概述79

4.1.1 块管理器BlockManager的实现79

4.1.2 Spark存储体系架构81

4.2 shuffle服务与客户端83

4.2.1 Block的RPC服务84

4.2.2 构造传输上下文Transpor-tContext85

4.2.3 RPC客户端工厂Transport-ClientFactory86

4.2.4 Netty服务器TransportServer87

4.2.5 获取远程shuffle文件88

4.2.6 上传shuffle文件89

4.3 BlockManagerMaster对Block-Manager的管理90

4.3.1 BlockManagerMasterActor90

4.3.2 询问Driver并获取回复方法92

4.3.3 向BlockManagerMaster注册BlockManagerId93

4.4 磁盘块管理器DiskBlockManager94

4.4.1 DiskBlockManager的构造过程94

4.4.2 获取磁盘文件方法getFile96

4.4.3 创建临时Block方法create-TempShuffleBlock96

4.5 磁盘存储DiskStore97

4.5.1 NIO读取方法getBytes97

4.5.2 NIO写入方法putBytes98

4.5.3 数组写入方法putArray98

4.5.4 Iterator写入方法putIterator98

4.6 内存存储MemoryStore99

4.6.1 数据存储方法putBytes101

4.6.2 Iterator写入方法putIterator详解101

4.6.3 安全展开方法unrollSafely102

4.6.4 确认空闲内存方法ensureFreeSpace105

4.6.5 内存写入方法putArray107

4.6.6 尝试写入内存方法tryToPut108

4.6.7 获取内存数据方法getBytes109

4.6.8 获取数据方法getValues110

4.7 Tachyon存储TachyonStore110

4.7.1 Tachyon简介111

4.7.2 TachyonStore的使用112

4.7.3 写入Tachyon内存的方法putIntoTachyonStore113

4.7.4 获取序列化数据方法getBytes113

4.8 块管理器BlockManager114

4.8.1 移出内存方法dropFrom-Memory114

4.8.2 状态报告方法reportBlockStatus116

4.8.3 单对象块写入方法putSingle117

4.8.4 序列化字节块写入方法putBytes118

4.8.5 数据写入方法doPut118

4.8.6 数据块备份方法replicate121

4.8.7 创建DiskBlockObjectWriter的方法getDiskWriter125

4.8.8 获取本地Block数据方法getBlockData125

4.8.9 获取本地shuffle数据方法doGetLocal126

4.8.10 获取远程Block数据方法doGetRemote127

4.8.11 获取Block数据方法get128

4.8.12 数据流序列化方法dataSerializeStream129

4.9 metadataCleaner和broadcastCleaner129

4.10 缓存管理器CacheManager130

4.11 压缩算法133

4.12 磁盘写入实现DiskBlockObjectWriter133

4.13 块索引shuffle管理器IndexShuffleBlockManager135

4.14 shuffle内存管理器ShuffleMemoryManager137

4.15 小结138

第5章 任务提交与执行139

5.1 任务概述139

5.2 广播Hadoop的配置信息142

5.3 RDD转换及DAG构建144

5.3.1 为什么需要RDD144

5.3.2 RDD实现分析146

5.4 任务提交152

5.4.1 任务提交的准备152

5.4.2 finalStage的创建与Stage的划分157

5.4.3 创建Job163

5.4.4 提交Stage164

5.4.5 提交Task165

5.5 执行任务176

5.5.1 状态更新176

5.5.2 任务还原177

5.5.3 任务运行178

5.6 任务执行后续处理179

5.6.1 计量统计与执行结果序列化179

5.6.2 内存回收180

5.6.3 执行结果处理181

5.7 小结187

第6章 计算引擎188

6.1 迭代计算188

6.2 什么是shuffle192

6.3 map端计算结果缓存处理194

6.3.1 map端计算结果缓存聚合195

6.3.2 map端计算结果简单缓存200

6.3.3 容量限制201

6.4 map端计算结果持久化204

6.4.1 溢出分区文件205

6.4.2排序与分区分组207

6.4.3 分区索引文件209

6.5 reduce端读取中间计算结果210

6.5.1 获取map任务状态213

6.5.2 划分本地与远程Block215

6.5.3 获取远程Block217

6.5.4 获取本地Block218

6.6 reduce端计算219

6.6.1 如何同时处理多个map任务的中间结果219

6.6.2 reduce端在缓存中对中间计算结果执行聚合和排序220

6.7 map端与reduce端组合分析221

6.7.1 在map端溢出分区文件,在reduce端合并组合221

6.7.2 在map端简单缓存、排序分组,在reduce端合并组合222

6.7.3 在map端缓存中聚合、排序分组,在reduce端组合222

6.8 小结223

第7章 部署模式224

7.1 local部署模式225

7.2 local-cluster部署模式225

7.2.1 LocalSparkCluster的启动226

7.2.2 CoarseGrainedSchedulerBackend的启动236

7.2.3 启动AppClient237

7.2.4 资源调度242

7.2.5 local-cluster模式的任务执行253

7.3 Standalone部署模式255

7.3.1 启动Standalone模式255

7.3.2 启动Master分析257

7.3.3 启动Worker分析259

7.3.4 启动Driver Application分析261

7.3.5 Standalone模式的任务执行263

7.3.6 资源回收263

7.4 容错机制266

7.4.1 Executor异常退出266

7.4.2 Worker异常退出268

7.4.3 Master异常退出269

7.5 其他部署方案276

7.5.1 YARN277

7.5.2 Mesos280

7.6 小结282

扩 展 篇

第8章 Spark SQL284

8.1 Spark SQL总体设计284

8.1.1 传统关系型数据库SQL运行原理285

8.1.2 Spark SQL运行架构286

8.2 字典表Catalog288

8.3 Tree和TreeNode289

8.4 词法解析器Parser的设计与实现293

8.4.1 SQL语句解析的入口294

8.4.2 建表语句解析器DDLParser295

8.4.3 SQL语句解析器SqlParser296

8.4.4 Spark代理解析器SparkSQLParser299

8.5 Rule和RuleExecutor300

8.6 Analyzer与Optimizer的设计与实现302

8.6.1 语法分析器Analyzer304

8.6.2 优化器Optimizer305

8.7 生成物理执行计划306

8.8 执行物理执行计划308

8.9 Hive311

8.9.1 Hive SQL语法解析器311

8.9.2 Hive SQL元数据分析313

8.9.3 Hive SQL物理执行计划314

8.10 应用举例:JavaSparkSQL314

8.11 小结320

第9章 流式计算321

9.1 Spark Streaming总体设计321

9.2 StreamingContext初始化323

9.3 输入流接收器规范Receiver324

9.4 数据流抽象DStream325

9.4.1 Dstream的离散化326

9.4.2 数据源输入流InputDStream327

9.4.3 Dstream转换及构建DStream Graph329

9.5 流式计算执行过程分析330

9.5.1 流式计算例子CustomReceiver331

9.5.2 Spark Streaming执行环境构建335

9.5.3 任务生成过程347

9.6 窗口操作355

9.7 应用举例357

9.7.1 安装mosquitto358

9.7.2 启动mosquitto358

9.7.3 MQTTWordCount359

9.8 小结361

第10章 图计算362

10.1 Spark GraphX总体设计362

10.1.1 图计算模型363

10.1.2 属性图365

10.1.3 GraphX的类继承体系367

10.2 图操作368

10.2.1 属性操作368

10.2.2 结构操作368

10.2.3 连接操作369

10.2.4 聚合操作370

10.3 Pregel API371

10.3.1 Dijkstra算法373

10.3.2 Dijkstra的实现376

10.4 Graph的构建377

10.4.1 从边的列表加载Graph377

10.4.2 在Graph中创建图的方法377

10.5 顶点集合抽象VertexRDD378

10.6 边集合抽象EdgeRDD379

10.7 图分割380

10.8 常用算法382

10.8.1 网页排名382

10.8.2 Connected Components的应用386

10.8.3 三角关系统计388

10.9 应用举例390

10.10 小结391

第11章 机器学习392

11.1机器学习概论392

11.2 Spark MLlib总体设计394

11.3 数据类型394

11.3.1 局部向量394

11.3.2标记点395

11.3.3局部矩阵396

11.3.4分布式矩阵396

11.4基础统计398

11.4.1摘要统计398

11.4.2相关统计399

11.4.3分层抽样401

11.4.4假设检验401

11.4.5随机数生成402

11.5分类和回归405

11.5.1数学公式405

11.5.2线性回归407

11.5.3分类407

11.5.4回归410

11.6决策树411

11.6.1基本算法411

11.6.2使用例子412

11.7随机森林413

11.7.1基本算法414

11.7.2使用例子414

11.8梯度提升决策树415

11.8.1基本算法415

11.8.2使用例子416

11.9朴素贝叶斯416

11.9.1算法原理416

11.9.2使用例子418

11.10保序回归418

11.10.1算法原理418

11.10.2使用例子419

11.11协同过滤419

11.12聚类420

11.12.1K-means420

11.12.2高斯混合422

11.12.3快速迭代聚类422

11.12.4latent Dirichlet allocation422

11.12.5流式K-means423

11.13维数减缩424

11.13.1奇异值分解424

11.13.2主成分分析425

11.14特征提取与转型425

11.14.1术语频率反转425

11.14.2单词向量转换426

11.14.3标准尺度427

11.14.4正规化尺度428

11.14.5卡方特征选择器428

11.14.6Hadamard积429

11.15频繁模式挖掘429

11.16预言模型标记语言430

11.17管道431

11.17.1管道工作原理432

11.17.2管道API介绍433

11.17.3交叉验证435

11.18小结436

附录A Utils437

附录B Akka446

附录C Jetty450

附录D Metrics453

附录E Hadoop word count456

附录F CommandUtils458

附录G Netty461

附录H 源码编译错误465


准 备 篇

1章 环境准备

第2章 Spark设计理念与基本架构



第1章

环 境 准 备

凡事豫则立,不豫则废;言前定,则不跲;事前定,则不困。

—《礼记·中庸》

本章导读

在深入了解一个系统的原理、实现细节之前,应当先准备好它的源码编译环境、运行环境。如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型、部署模式等。当你通过一些途径知道了系统的原理之后,难道不会问问自己:“这是怎么做到的?”如果只是游走于系统使用、原理了解的层面,是永远不可能真正理解整个系统的。很多IDE本身带有调试的功能,每当你阅读源码,陷入重围时,调试能让我们更加理解运行期的系统。如果没有调试功能,不敢想象阅读源码会怎样困难。

本章的主要目的是帮助读者构建源码学习环境,主要包括以下内容:

在Windows环境下搭建源码阅读环境;

在Linux环境下搭建基本的执行环境;

Spark的基本使用,如spark-shell。

1.1 运行环境准备

考虑到大部分公司的开发和生成环境都采用Linux操作系统,所以笔者选用了64位的Linux。在正式安装Spark之前,先要找台好机器。为什么?因为笔者在安装、编译、调试的过程中发现Spark非常耗费内存,如果机器配置太低,恐怕会跑不起来。Spark的开发语言是Scala,而Scala需要运行在JVM之上,因而搭建Spark的运行环境应该包括JDK和Scala。

1.1.1 安装JDK

使用命令getconf LONG_BIT查看Linux机器是32位还是64位,然后下载相应版本的JDK并安装。

下载地址:

http://www.oracle.com/technetwork/java/javase/downloads/index.html

配置环境:

cd ~

vim .bash_profile 

添加如下配置:

export JAVA_HOME=/opt/java

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

由于笔者的机器上已经安装过openjdk,所以未使用以上方式,openjdk的安装命令如下:

$ su -c "yum install java-1.7.0-openjdk"

安装完毕后,使用java –version命令查看,确认安装正常,如图1-1所示。


图1-1 查看安装是否正常

1.1.2 安装Scala

下载地址:http://www.scala-lang.org/download/

选择最新的Scala版本下载,下载方法如下:

wget http://downloads.typesafe.com/scala/2.11.5/scala-2.11.5.tgz

移动到选好的安装目录,例如:

mv scala-2.11.5.tgz ~/install/

进入安装目录,执行以下命令:

chmod 755 scala-2.11.5.tgz

tar -xzvf scala-2.11.5.tgz 

配置环境:

cd ~

vim .bash_profile 

添加如下配置:

export SCALA_HOME=$HOME/install/scala-2.11.5

export PATH=$PATH:$SCALA_HOME/bin:$HOME/bin

安装完毕后输入scala,进入scala命令行说明scala安装正确,如图1-2所示。


图1-2 进入scala命令行

1.1.3 安装Spark

下载地址:http://spark.apache.org/downloads.html

选择最新的Spark版本下载,下载方法如下:

wget http://archive.apache.org/dist/spark/spark-1.2.0/spark-1.2.0-bin-hadoop1.tgz

移动到选好的安装目录,如:

mv spark-1.2.0-bin-hadoop1.tgz~/install/

进入安装目录,执行以下命令:

chmod 755 spark-1.2.0-bin-hadoop1.tgz

tar -xzvf spark-1.2.0-bin-hadoop1.tgz

配置环境:

cd ~

vim .bash_profile 

添加如下配置:

export SPARK_HOME=$HOME/install/spark-1.2.0-bin-hadoop1

1.2 Spark初体验

本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。

1.2.1 运行spark-shell

要运行spark-shell,需要先对Spark进行配置。

1)进入Spark的conf文件夹:

cd ~/install/spark-1.2.0-bin-hadoop1/conf

2)复制一份spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

3)添加如下配置:

export SPARK_MASTER_IP=127.0.0.1

export SPARK_LOCAL_IP=127.0.0.1

4)启动spark-shell:

cd ~/install/spark-1.2.0-bin-hadoop1/bin

./spark-shell

最后我们会看到spark启动的过程,如图1-3所示。


图1-3 Spark启动过程

从以上启动日志中我们可以看到SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处望文生义即可,具体内容将在后边的章节详细讲解。

1.2.2 执行word count

这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开scala命令行,然后按照以下步骤输入脚本。

1)输入val lines = sc.textFile("../README.md", 2),执行结果如图1-4所示。


图1-4 步骤1执行结果

2)输入val words = lines.flatMap(line => line.split(" ")),执行结果如图1-5所示。


图1-5 步骤2执行结果

3)输入val ones = words.map(w => (w,1)),执行结果如图1-6所示。


图1-6 步骤3执行结果

4)输入val counts = ones.reduceByKey(_ + _),执行结果如图1-7所示。


图1-7 步骤4执行结果

5)输入counts.foreach(println),任务执行过程如图1-8和图1-9所示。输出结果如图1-10所示。


图1-8 步骤5执行过程部分(一)


图1-9 步骤5执行过程部分(二)


图1-10 步骤5输出结果

在这些输出日志中,我们先是看到Spark中任务的提交与执行过程,然后看到单词计数的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第5章中展开。

1.2.3 剖析spark-shell

通过word count在spark-shell中执行的过程,我们想看看spark-shell做了什么。spark-shell中有以下一段脚本,见代码清单1-1。

代码清单1-1 spark-shell中的一段脚本

function main() {

    if $cygwin; then

stty -icanonmin 1 -echo > /dev/null 2>&1

        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"

        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"

sttyicanon echo > /dev/null 2>&1

    else

        export SPARK_SUBMIT_OPTS

        "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"

fi

}

我们看到脚本spark-shell里执行了spark-submit脚本,打开spark-submit脚本,发现其中包含以下脚本。

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。打开spark-class脚本,其中包含以下脚本,见代码清单1-2。

代码清单1-2 spark-class

if [ -n "${JAVA_HOME}" ]; then

    RUNNER="${JAVA_HOME}/bin/java"

else

    if [ `command -v java` ]; then

        RUNNER="java"

    else

       echo "JAVA_HOME is not set" >&2

       exit 1

    fi

fi


exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

读到这里,应该知道Spark启动了以SparkSubmit为主类的jvm进程。

为便于在本地对Spark进程使用远程监控,给spark-class脚本追加以下jmx配置:

JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

在本地打开jvisualvm,添加远程主机,如图1-11所示。

右击已添加的远程主机,添加JMX连接,如图1-12所示。


单击右侧的“线程”选项卡,选择main线程,然后单击“线程Dump”按钮,如图1-13所示。

从dump的内容中找到线程main的信息,如代码清单1-3所示。


图1-13 查看Spark线程

代码清单1-3 main线程dump信息

"main" - Thread t@1

    java.lang.Thread.State: RUNNABLE

        at java.io.FileInputStream.read0(Native Method)

        at java.io.FileInputStream.read(FileInputStream.java:210)

        at scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)

        at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)

        at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.

        java:933)

        at scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)

        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)

        at scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)

        at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.

        scala:80)

        at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-

        Reader.scala:43)

        at org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)

        at org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:619)

        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp 

        (SparkI-Loop.scala:968)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

        scala:916)

        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

        scala:916)

        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClass

        Loader.scala:135)

        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

        at org.apache.spark.repl.Main$.main(Main.scala:31)

        at org.apache.spark.repl.Main.main(Main.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.

        java:57)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-

        sorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main→SparkI-Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见代码清单1-4。

代码清单1-4 initializeSpark的实现

def initializeSpark() {

intp.beQuietDuring {

    command("""

        @transient val sc = {

            val _sc = org.apache.spark.repl.Main.interp.createSparkContext()

            println("Spark context available as sc.")

            _sc

        }

        """)

        command("import org.apache.spark.SparkContext._")

    }

}

我们看到initializeSpark调用了createSparkContext方法,createSparkContext的实现见代码清单1-5。

代码清单1-5 createSparkContext的实现

def createSparkContext(): SparkContext = {

valexecUri = System.getenv("SPARK_EXECUTOR_URI")

valjars = SparkILoop.getAddedJars

valconf = new SparkConf()

    .setMaster(getMaster())

    .setAppName("Spark shell")

    .setJars(jars)

    .set("spark.repl.class.uri", intp.classServer.uri)

if (execUri != null) {

                      conf.set("spark.executor.uri", execUri)

    }

sparkContext = new SparkContext(conf)

    logInfo("Created spark context..")

    sparkContext

}

这里最终使用SparkConf和SparkContext来完成初始化,具体内容将在第3章讲解。代码分析中涉及的repl主要用于与Spark实时交互。

1.3 阅读环境准备

准备Spark阅读环境,同样需要一台好机器。笔者调试源码的机器的内存是8 GB。源码阅读的前提是在IDE环境中打包、编译通过。常用的IDE有IntelliJ IDEA、Eclipse。笔者选择用Eclipse编译Spark,原因有二:一是由于使用多年对它比较熟悉,二是社区中使用Eclipse编译Spark的资料太少,在这里可以做个补充。在Windows系统编译Spark源码,除了安装JDK外,还需要安装以下工具。

(1)安装Scala

由于Spark 1.20版本的sbt里指定的Scala版本是2.10.4,具体见Spark源码目录下的文件\project\plugins.sbt,其中有一行:scalaVersion := "2.10.4"。所以选择下载scala-2.10.4.msi,下载地址:http://www.scala-lang.org/download/。

下载完毕,安装scala-2.10.4.msi。

(2)安装SBT

由于Scala使用SBT作为构建工具,所以需要下载SBT。下载地址:http://www.scala-sbt.org/,下载最新的安装包sbt-0.13.8.msi并安装。

(3)安装Git Bash

由于Spark源码使用Git作为版本控制工具,所以需要下载Git的客户端工具,推荐使用Git Bash,因为它更符合Linux下的操作习惯。下载地址:http://msysgit.github.io/,下载最新的版本并安装。

(4)安装Eclipse Scala IDE插件

Eclipse通过强大的插件方式支持各种IDE工具的集成,要在Eclipse中编译、调试、运行Scala程序,就需要安装Eclipse Scala IDE插件。下载地址:http://scala-ide.org/download/current.html。

由于笔者本地的Eclipse版本是Eclipse 4.4 (Luna),所以选择安装插件http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site,如图1-14所示。


图1-14 Eclipse Scala IDE插件安装地址

在Eclipse中选择Help菜单,然后选择Install New Software…选项,打开Install对话框,如图1-15所示。


图1-15 Install对话框

单击Add按钮,打开Add Repository对话框,输入插件地址,如图1-16所示。


图1-16 添加Scala IDE插件地址

全选插件的内容,完成安装,如图1-17所示。


图1-17 安装Scala IDE插件

1.4 Spark源码编译与调试

1.下载Spark源码

首先,访问Spark官网http://spark.apache.org/,如图1-18所示。


图1-18 Spark官网

单击Download Spark按钮,在下一个页面找到git地址,如图1-19所示。


图1-19 Spark官方git地址

打开Git Bash工具,输入git clone git://github.com/apache/spark.git命令将源码下载到本地,如图1-20所示。


图1-20 下载Spark源码

2.构建Scala应用

使用cmd命令行进到Spark根目录,执行sbt命令。会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完。

3.使用sbt生成Eclipse工程文件

等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟。完成时的状况如图1-21所示。


图1-21 sbt编译过程

现在我们查看Spark下的子文件夹,发现其中都生成了.project和.classpath文件。比如mllib项目下就生成了.project和.classpath文件,如图1-22所示。


图1-22 sbt生成的项目文件

4.编译Spark源码

由于Spark使用Maven作为项目管理工具,所以需要将Spark项目作为Maven项目导入Eclipse中,如图1-23所示。

单击Next按钮进入下一个对话框,如图1-24所示。


图1-23 导入Maven项目

全选所有项目,单击Finish按钮,这样就完成了导入,如图1-25所示。


导入完成后,需要设置每个子项目的build path。右击每个项目,选择“Build Path”→ “Configure Build Path…”,打开Java Build Path界面,如图1-26所示。


图1-26 Java编译目录

单击Add External JARs按钮,将Spark项目下的lib_managed文件夹的子文件夹bundles和jars内的jar包添加进来。

lib_managed/jars文件夹下有很多打好的spark的包,比如:spark-catalyst_2.10-1.3.2-SNAPSHOT.jar。这些jar包有可能与你下载的Spark源码的版本不一致,导致你在调试源码时,发生jar包冲突。所以请将它们排除出去。

Eclipse在对项目编译时,笔者本地出现了很多错误,有关这些错误的解决建议参见附录H。所有错误解决后运行mvn clean install,如图1-27所示。

5.调试Spark源码

以Spark源码自带的JavaWordCount为例,介绍如何调试Spark源码。右击JavaWord-Count.java,选择“Debug As”→“Java Application”即可。如果想修改配置参数,右击JavaWordCount.java,选择“Debug As”→“Debug Configurations…”,从打开的对话框中选择JavaWordCount,在右侧标签可以修改Java执行参数、JRE、classpath、环境变量等配置,如图1-28所示。

读者也可以在Spark源码中设置断点,进行跟踪调试。


图1-27 编译成功


图1-28 源码调试

1.5 小结

本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习。由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间。






第2章

Spark设计理念与基本架构

若夫乘天地之正,而御六气之辩,以游无穷者,彼且恶乎待哉?

—《庄子·逍遥游》

本章导读

上一章,介绍了Spark环境的搭建,为方便读者学习Spark做好准备。本章首先从Spark产生的背景开始,介绍Spark的主要特点、基本概念、版本变迁。然后简要说明Spark的主要模块和编程模型。最后从Spark的设计理念和基本架构入手,使读者能够对Spark有宏观的认识,为之后的内容做一些准备工作。

Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerkeley)的AMP实验室开发于2009年,并于2010年开源,2013年成长为Apache旗下大数据领域最活跃的开源项目之一。Spark也是基于map reduce 算法模式实现的分布式计算框架,拥有Hadoop MapReduce所具有的优点,并且解决了Hadoop MapReduce中的诸多缺陷。

2.1 初识Spark

2.1.1 Hadoop MRv1的局限

Hadoop1.0版本采用的是MRv1版本的MapReduce编程模型。MRv1版本的实现都封装在org.apache.hadoop.mapred包中,MRv1的Map和Reduce是通过接口实现的。MRv1包括三个部分:

运行时环境(JobTracker和TaskTracker);

编程模型(MapReduce);

数据处理引擎(Map任务和Reduce任务)。

MRv1存在以下不足:

可扩展性差:在运行时,JobTracker既负责资源管理又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。

可用性差:采用了单节点的Master,没有备用Master及选举操作,这导致一旦Master出现故障,整个集群将不可用。

资源利用率低:TaskTracker 使用slot等量划分本节点上的资源量。slot代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,Hadoop 调度器负责将各个TaskTracker 上的空闲slot分配给Task使用。一些Task并不能充分利用slot,而其他Task也无法使用这些空闲的资源。slot 分为Map slot 和Reduce slot 两种,分别供MapTask和Reduce Task使用。有时会因为作业刚刚启动等原因导致MapTask很多,而Reduce Task任务还没有调度的情况,这时Reduce slot也会被闲置。

不能支持多种MapReduce框架:无法通过可插拔方式将自身的MapReduce框架替换为其他实现,如Spark、Storm等。

MRv1的示意如图2-1所示。

Apache为了解决以上问题,对Hadoop进行升级改造,MRv2最终诞生了。MRv2重用了MRv1中的编程模型和数据处理引擎,但是运行时环境被重构了。JobTracker被拆分成了通用的资源调度平台(ResourceManager,RM)和负责各个计算框架的任务调度模型(ApplicationMaster,AM)。MRv2中MapReduce的核心不再是MapReduce框架,而是YARN。在以YARN为核心的MRv2中,MapReduce框架是可插拔的,完全可以替换为其他MapReduce实现,比如Spark、Storm等。MRv2的示意如图2-2所示。


Hadoop MRv2虽然解决了MRv1中的一些问题,但是由于对HDFS的频繁操作(包括计算结果持久化、数据备份及shuffle等)导致磁盘I/O成为系统性能的瓶颈,因此只适用于离线数据处理,而不能提供实时数据处理能力。

2.1.2 Spark使用场景

Hadoop常用于解决高吞吐、批量处理的业务场景,例如离线计算结果用于浏览量统计。如果需要实时查看浏览量统计信息,Hadoop显然不符合这样的要求。Spark通过内存计算能力极大地提高了大数据处理速度,满足了以上场景的需要。此外,Spark还支持SQL查询、流式计算、图计算、机器学习等。通过对Java、Python、Scala、R等语言的支持,极大地方便了用户的使用。

2.1.3 Spark的特点

Spark看到MRv1的问题,对MapReduce做了大量优化,总结如下:

快速处理能力。随着实时大数据应用越来越多,Hadoop作为离线的高吞吐、低响应框架已不能满足这类需求。Hadoop MapReduce的Job将中间输出和结果存储在HDFS中,读写HDFS造成磁盘I/O成为瓶颈。Spark允许将中间输出和结果存储在内存中,避免了大量的磁盘I/O。同时Spark自身的DAG执行引擎也支持数据在内存中的计算。Spark官网声称性能比Hadoop快100倍,如图2-3所示。即便是内存不足,需要磁盘I/O,其速度也是Hadoop的10倍以上。

易于使用。Spark现在支持Java、Scala、Python和R等语言编写应用程序,大大降低了使用者的门槛。自带了80多个高等级操作符,允许在Scala、Python、R的shell中进行交互式查询。

支持查询。Spark支持SQL及Hive SQL对数据查询。

支持流式计算。与MapReduce只能处理离线数据相比,Spark还支持实时的流计算。Spark依赖Spark Streaming对数据进行实时的处理,其流式处理能力还要强于Storm。

可用性高。Spark自身实现了Standalone部署模式,此模式下的Master可以有多个,解决了单点故障问题。此模式完全可以使用其他集群管理器替换,比如YARN、Mesos、EC2等。

丰富的数据源支持。Spark除了可以访问操作系统自身的文件系统和HDFS,还可以访问Cassandra、HBase、Hive、Tachyon以及任何Hadoop的数据源。这极大地方便了已经使用HDFS、Hbase的用户顺利迁移到Spark。

2.2 Spark基础知识

1.版本变迁

经过4年多的发展,Spark目前的版本是1.4.1。我们简单看看它的版本发展过程。

1)Spark诞生于UCBerkeley的AMP实验室(2009)。

2)Spark正式对外开源(2010年)。

3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化。

4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性。

5)Spark 0.7.0版本发布(2013-02-27),增加了更多关键特性,例如,Python API、Spark Streaming的alpha版本等。

6)Spark 0.7.2版本发布(2013-06-02),性能改进并解决了一些bug,新增API使用的例子。

7)Spark接受进入Apache孵化器(2013-06-21)。

8)Spark 0.7.3版本发布(2013-07-16),解决一些bug,更新Spark Streaming API等。

9)Spark 0.8.0版本发布(2013-09-25),一些新功能及可用性改进。

10)Spark 0.8.1版本发布(2013-12-19),支持Scala 2.9、YARN 2.2、Standalone部署模式下调度的高可用性、shuffle的优化等。

11)Spark 0.9.0版本发布(2014-02-02),增加了GraphX,机器学习新特性,流式计算新特性,核心引擎优化(外部聚合、加强对YARN的支持)等。

12)Spark 0.9.1版本发布(2014-04-09),增强使用YARN的稳定性,改进Scala和Python API的奇偶性。

13)Spark 1.0.0版本发布(2014-05-30),Spark SQL、MLlib、GraphX和Spark Streaming都增加了新特性并进行了优化。Spark核心引擎还增加了对安全YARN集群的支持。

14)Spark 1.0.1版本发布(2014-07-11),增加了Spark SQL的新特性和对JSON数据的支持等。

15)Spark 1.0.2版本发布(2014-08-05),Spark核心API及Streaming、Python、MLlib的bug修复。

16)Spark 1.1.0版本发布(2014-09-11)。

17)Spark 1.1.1版本发布(2014-11-26),Spark核心API及Streaming、Python、SQL、GraphX和MLlib的bug修复。

18)Spark 1.2.0版本发布(2014-12-18)。

19)Spark 1.2.1版本发布(2015-02-09),Spark核心API及Streaming、Python、SQL、GraphX和MLlib的bug修复。

20)Spark 1.3.0版本发布(2015-03-13)。

21)Spark 1.4.0版本发布(2015-06-11)。

22)Spark 1.4.1版本发布(2015-07-15),DataFrame API及Streaming、Python、SQL和MLlib的bug修复。

2.基本概念

要想对Spark有整体性的了解,推荐读者阅读Matei Zaharia的Spark论文。此处笔者先介绍Spark中的一些概念:

RDD(resillient distributed dataset):弹性分布式数据集。

Task:具体执行任务。Task分为ShuffleMapTask和ResultTask两种。ShuffleMapTask和ResultTask分别类似于Hadoop中的Map和Reduce。

Job:用户提交的作业。一个Job可能由一到多个Task组成。

Stage:Job分成的阶段。一个Job可能被划分为一到多个Stage。

Partition:数据分区。即一个RDD的数据可以划分为多少个分区。

NarrowDependency:窄依赖,即子RDD依赖于父RDD中固定的Partition。Narrow-Dependency分为OneToOneDependency和RangeDependency两种。

ShuffleDependency:shuffle依赖,也称为宽依赖,即子RDD对父RDD中的所有Partition都有依赖。

DAG(directed acycle graph):有向无环图。用于反映各RDD之间的依赖关系。

3. Scala与Java的比较

Spark为什么要选择Java作为开发语言?笔者不得而知。如果能对二者进行比较,也许能看出一些端倪。表2-1列出了Scala与Java的比较。

表2-1 Scala与Java的比较

比项项 Scala Java

语言类型  面向函数为主,兼有面向对象 面向对象(Java8也增加了lambda函数编程)

简洁性  非常简洁 不简洁

类型推断  丰富的类型推断,例如深度和链式的类型推断、 duck type、隐式类型转换等,但也因此增加了编译时长 少量的类型推断

可读性  一般,丰富的语法糖导致的各种奇幻用法,例如方法签名

学习成本  较高 一般

语言特性  非常丰富的语法糖和更现代的语言特性,例如 Option、模式匹配、使用空格的方法调用 丰富

并发编程  使用Actor的消息模型 使用阻塞、锁、阻塞队列等


通过以上比较似乎仍然无法判断Spark选择Java作为开发语言的原因。由于函数式编程更接近计算机思维,因此便于通过算法从大数据中建模,这应该更符合Spark作为大数据框架的理念吧!

2.3 Spark基本设计思想

2.3.1 Spark模块设计

整个Spark主要由以下模块组成:

Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交)、部署模式、存储体系、任务提交与执行、计算引擎等。

Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询。此外,还为熟悉Hadoop的用户提供Hive SQL处理能力。

Spark Streaming:提供流式计算处理能力,目前支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和简单的TCP套接字等数据源。此外,还提供窗口操作。

GraphX:提供图计算处理能力,支持分布式,Pregel提供的API可以解决图计算中的常见问题。

MLlib:提供机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的API接口大大降低了用户的学习成本。

Spark SQL、Spark Streaming、GraphX、MLlib的能力都是建立在核心引擎之上,如图2-4所示。

1. Spark核心功能

Spark Core提供Spark最基础与最核心的功能,主要包括以下功能。

SparkContext:通常而言,Driver Application的执行与输出都是通过SparkContext来完成的,在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web服务等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。SparkContext内置的DAGScheduler负责创建Job,将DAG中的RDD划分到不同的Stage,提交Stage等功能。内置的TaskScheduler负责资源的申请、任务的提交及请求集群对任务的调度等工作。

存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘I/O,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。

计算引擎:计算引擎由SparkContext中的DAGScheduler、RDD以及具体节点上的Executor负责执行的Map和Reduce任务组成。DAGScheduler和RDD虽然位于SparkContext内部,但是在任务正式提交与执行之前会将Job中的RDD组织成有向无关图(简称DAG),并对Stage进行划分,决定了任务执行阶段任务的数量、迭代计算、shuffle等过程。

部署模式:由于单节点不足以提供足够的存储及计算能力,所以作为大数据处理的Spark在SparkContext的TaskScheduler组件中提供了对Standalone部署模式的实现和Yarn、Mesos等分布式资源管理系统的支持。通过使用Standalone、Yarn、Mesos等部署模式为Task分配计算资源,提高任务的并发执行效率。除了可用于实际生产环境的Standalone、Yarn、Mesos等部署模式外,Spark还提供了Local模式和local-cluster模式便于开发和调试。

2. Spark扩展功能

为了扩大应用范围,Spark陆续增加了一些扩展功能,主要包括:

Spark SQL:SQL具有普及率高、学习成本低等特点,为了扩大Spark的应用面,增加了对SQL及Hive的支持。Spark SQL的过程可以总结为:首先使用SQL语句解析器(SqlParser)将SQL转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行。其中,规则执行器包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL类似。

Spark Streaming:Spark Streaming与Apache Storm类似,也用于流式计算。Spark Streaming支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和简单的TCP套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream是Spark Streaming中所有数据流的抽象,Dstream可以被组织为DStream Graph。Dstream本质上由一系列连续的RDD组成。

GraphX:Spark提供的分布式图计算框架。GraphX主要遵循整体同步并行(bulk synchronous parallell,BSP)计算模式下的Pregel模型实现。GraphX提供了对图的抽象Graph,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr和dstAttr用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。

MLlib:Spark提供的机器学习框架。机器学习是一门涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib目前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概率论、数据挖掘方面的数学算法。

2.3.2 Spark模型设计

1. Spark编程模型

Spark 应用程序从编写到提交、执行、输出的整个过程如图2-5所示,图中描述的步骤如下。

1)用户使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写Driver application程序。此外SQLContext、HiveContext及StreamingContext对Spark-Context进行封装,并提供了SQL、Hive及流式计算相关的API。

2)使用SparkContext提交的用户应用程序,首先会使用BlockManager和Broadcast-Manager将任务的Hadoop配置进行广播。然后由DAGScheduler将任务转换为RDD并组织成DAG,DAG还将被划分为不同的Stage。最后由TaskScheduler借助ActorSystem将任务提交给集群管理器(Cluster Manager)。

3)集群管理器(Cluster Manager)给任务分配资源,即将具体任务分配到Worker上,Worker创建Executor来处理任务的运行。Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

2. RDD计算模型

RDD可以看做是对各种数据计算模型的统一抽象,Spark的计算过程主要是RDD的迭代计算过程,如图2-6所示。RDD的迭代计算过程非常类似于管道。分区数量取决于partition数量的设定,每个分区的数据只会在一个Task中计算。所有分区可以在多个机器节点的Executor上并行执行。


图2-5 代码执行过程


图2-6 RDD计算模型

2.4 Spark基本架构

从集群部署的角度来看,Spark集群由以下部分组成:

Cluster Manager:Spark的集群管理器,主要负责资源的分配与管理。集群管理器分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给应用程序,但是并不负责对Executor的资源分配。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

Worker:Spark的工作节点。对Spark应用程序来说,由集群管理器分配得到资源的Worker节点主要负责以下工作:创建Executor,将资源和任务进一步分配给Executor,同步资源信息给Cluster Manager。

Executor:执行计算任务的一线进程。主要负责任务的执行以及与Worker、Driver App的信息同步。

Driver App:客户端驱动程序,也可以理解为客户端应用程序,用于将任务程序转换为RDD和DAG,并与Cluster Manager进行通信与调度。

这些组成部分之间的整体关系如图2-7所示。


图2-7 Spark基本架构图

2.5 小结

每项技术的诞生都会由某种社会需求所驱动,Spark正是在实时计算的大量需求下诞生的。Spark借助其优秀的处理能力、可用性高、丰富的数据源支持等特点,在当前大数据领域变得火热,参与的开发者也越来越多。Spark经过几年的迭代发展,如今已经提供了丰富的功能。笔者相信,Spark在未来必将产生更耀眼的火花。





核心设计篇

第3章 SparkContext的初始化

第4章 存储体系

第5章 任务提交与执行

第6章 计算引擎

第7章 部署模式


第3章

SparkContext的初始化

道生一, 一生二, 二生三, 三生万物。

—《道德经》

本章导读

SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码。读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径。已经熟练使用Spark的开发人员可以选择跳过本章内容。

本章将在介绍SparkContext初始化过程的同时,向读者介绍各个组件的作用,为阅读后面的章节打好基础。Spark中的组件很多,就其功能而言涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI的方方面面。

3.1 SparkContext概述

Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端。了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程。

Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext初始化完毕,才能向Spark集群提交任务。在平坦的公路上,发动机只需以较低的转速、较低的功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而SparkContext的配置参数则由SparkConf负责,SparkConf就是你的操作面板。

SparkConf的构造很简单,主要是通过ConcurrentHashMap来维护各种Spark的配置属性。SparkConf代码结构见代码清单3-1。Spark的配置属性都是以“spark.”开头的字符串。

代码清单3-1 SparkConf代码结构

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

    import SparkConf._

    def this() = this(true)

    private val settings = new ConcurrentHashMap[String, String]()

    if (loadDefaults) {

        // 加载任何以spark.开头的系统属性

        for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {

            set(key, value)

        }

    }

//其余代码省略

现在开始介绍SparkContext。SparkContext的初始化步骤如下:

1)创建Spark执行环境SparkEnv;

2)创建RDD清理器metadataCleaner;

3)创建并初始化Spark UI;

4)Hadoop相关配置及Executor环境变量的设置;

5)创建任务调度TaskScheduler;

6)创建和启动DAGScheduler;

7)TaskScheduler的启动;

8)初始化块管理器BlockManager(BlockManager是存储体系的主要组件之一,将在第4章介绍);

9)启动测量系统MetricsSystem;

10)创建和启动Executor分配管理器ExecutorAllocationManager;

11)ContextCleaner的创建与启动;

12)Spark环境更新;

13)创建DAGSchedulerSource和BlockManagerSource;

14)将SparkContext标记为激活。

SparkContext的主构造器参数为SparkConf,其实现如下。

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

private val creationSite: CallSite = Utils.getCallSite()

    private val allowMultipleContexts: Boolean =

        config.getBoolean("spark.driver.allowMultipleContexts", false)

    SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

上面代码中的CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的Scala或者Spark核心类信息。Utils.getCallSite的详细信息见附录A。SparkContext默认只有一个实例(由属性spark.driver.allowMultipleContexts来控制,用户需要多个SparkContext实例时,可以将其设置为true),方法markPartiallyConstructed用来确保实例的唯一性,并将当前SparkContext标记为正在构建中。

接下来会对SparkConf进行复制,然后对各种配置信息进行校验,代码如下。

private[spark] val conf = config.clone()

conf.validateSettings()


if (!conf.contains("spark.master")) {

    throw new SparkException("A master URL must be set in your configuration")

}

if (!conf.contains("spark.app.name")) {

    throw new SparkException("An application name must be set in your configuration")

}

从上面校验的代码看到必须指定属性spark.master 和spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。

3.2 创建执行环境SparkEnv

SparkEnv是Spark的执行环境对象,其中包括众多与Executor执行相关的对象。由于在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv 主要使用SparkEnv的createDriverEnv,SparkEnv.createDriverEnv方法有三个参数:conf、isLocal和 listenerBus。

val isLocal = (master == "local" || master.startsWith("local["))

private[spark] val listenerBus = new LiveListenerBus

    conf.set("spark.executor.id", "driver")


    private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)

    SparkEnv.set(env)

上面代码中的conf是对SparkConf的复制,isLocal标识是否是单机模式,listenerBus采用监听器模式维护各类事件的处理,在3.4.1节会详细介绍。

SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:

1)创建安全管理器SecurityManager;

2)创建基于Akka的分布式消息系统ActorSystem;

3)创建Map任务输出跟踪器mapOutputTracker;

4)实例化ShuffleManager;

5)创建ShuffleMemoryManager;

6)创建块传输服务BlockTransferService;

7)创建BlockManagerMaster;

8)创建块管理器BlockManager;

9)创建广播管理器BroadcastManager;

10)创建缓存管理器CacheManager;

11)创建HTTP文件服务器HttpFileServer;

12)创建测量系统MetricsSystem;

13)创建SparkEnv。

3.2.1 安全管理器SecurityManager

SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成 secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现,参见代码清单3-2。

代码清单3-2 SecurityManager的实现

private val secretKey = generateSecretKey()


// 使用HTTP连接设置口令认证

if (authOn) {

    Authenticator.setDefault(

        new Authenticator() {

            override def getPasswordAuthentication(): PasswordAuthentication = {

                var passAuth: PasswordAuthentication = null

            val userInfo = getRequestingURL().getUserInfo()

            if (userInfo != null) {

                val  parts = userInfo.split(":", 2)

                passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())

                }

                return passAuth

            }

        }

    )

}

3.2.2 基于Akka的分布式消息系统ActorSystem

ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。消息系统可以实现并发?要解释清楚这个问题,首先应该简单介绍下Scala语言的Actor并发编程模型:Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的)传递数据。如:

Actor ! message

Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。ActorSystem便是Akka提供的用于创建分布式消息通信系统的基础类。Akka的具体信息见附录B。

正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。

SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,见代码清单3-3。AkkaUtils.createActorSystem方法用于启动ActorSystem,见代码清单3-4。AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int => (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreate-ActorSystem方法完成的,doCreateActorSystem的具体实现细节请见附录B。Spark的Driver中Akka的默认访问地址是akka://sparkDriver,Spark的Executor中Akka的默认访问地址是akka:// sparkExecutor。如果不指定ActorSystem的端口,那么所有节点的ActorSystem端口在每次启动时随机产生。关于startServiceOnPort的实现,请见附录A。

代码清单3-3 AkkaUtils工具类创建和启动ActorSystem

val (actorSystem, boundPort) =

    Option(defaultActorSystem) match {

        case Some(as) => (as, port)

        case None =>

            val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName

            AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)

}

代码清单3-4 ActorSystem的创建和启动

def createActorSystem(

        name: String,

        host: String,

        port: Int,

        conf: SparkConf,

        securityManager: SecurityManager): (ActorSystem, Int) = {

    val startService: Int => (ActorSystem, Int) = { actualPort =>

        doCreateActorSystem(name, host, actualPort, conf, securityManager)

    }

    Utils.startServiceOnPort(port, startService, conf, name)

}

3.2.3 map任务输出跟踪器mapOutputTracker

mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapId和reduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务的所在节点上拉取Block,这一过程叫做shuffle。每批shuffle过程都有唯一的标识shuffleId。

这里先介绍下MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]维护序列化后的各个map任务的输出状态。其中key对应shuffleId,Array存储各个序列化MapStatus生成的字节数组。

Driver和Executor处理MapOutputTrackerMaster的方式有所不同。

如果当前应用程序是Driver,则创建MapOutputTrackerMaster,然后创建MapOutputTrackerMasterActor,并且注册到ActorSystem中。

如果当前应用程序是Executor,则创建MapOutputTrackerWorker,并从ActorSystem中找到MapOutputTrackerMasterActor。

无论是Driver还是Executor,最后都由mapOutputTracker的属性trackerActor持有MapOutputTrackerMasterActor的引用,参见代码清单3-5。

代码清单3-5 registerOrLookup方法用于查找或者注册Actor的实现

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {

    if (isDriver) {

        logInfo("Registering " + name)

        actorSystem.actorOf(Props(newActor), name = name)

    } else {

        AkkaUtils.makeDriverRef(name, conf, actorSystem)

    }

    }


    val mapOutputTracker =  if (isDriver) {

        new MapOutputTrackerMaster(conf)

    } else {

        new MapOutputTrackerWorker(conf)

}


    mapOutputTracker.trackerActor = registerOrLookup(

        "MapOutputTracker",

    new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

在后面章节大家会知道map任务的状态正是由Executor向持有的MapOutputTracker-MasterActor发送消息,将map任务状态同步到mapOutputTracker的mapStatuses和cached-SerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的,具体细节参见附录B。这里第一次使用到了Akka提供的功能,以后大家会渐渐感觉到使用Akka的便捷。

3.2.4 实例化ShuffleManager

ShuffleManager负责管理本地及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式生成的SortShuffleManager的实例,可以修改属性spark.shuffle.manager为hash来显式控制使用HashShuffleManager。SortShuffleManager通过持有的IndexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地,并根据shuffleId、mapId写入索引文件,也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其他远程节点读取文件。有读者可能会问,为什么需要shuffle?Spark作为并行计算框架,同一个作业会被划分为多个任务在多个节点上并行执行,reduce的输入可能存在于多个节点上,因此需要通过“洗牌”将所有reduce的输入汇总起来,这个过程就是shuffle。这个问题以及对ShuffleManager的具体使用会在第5章和第6章详述。ShuffleManager的实例化见代码清单3-6。代码清单3-6最后创建的ShuffleMemoryManager将在3.2.5节介绍。

代码清单3-6 ShuffleManager的实例化及ShuffleMemoryManager的创建

    val shortShuffleMgrNames = Map(

        "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",

        "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")

    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")

    val shuffleMgrClass = shortShuffleMgrNames.get

OrElse(shuffleMgrName.toLowerCase, shuffleMgrName)

    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)


    val shuffleMemoryManager = new ShuffleMemoryManager(conf)

3.2.5 shuffle线程内存管理器ShuffleMemoryManager

ShuffleMemoryManager负责管理shuffle线程占有内存的分配与释放,并通过thread-Memory:mutable.HashMap[Long, Long]缓存每个线程的内存字节数,见代码清单3-7。

代码清单3-7 ShuffleMemoryManager的数据结构

private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {

    private val threadMemory = new mutable.HashMap[Long, Long]()  // threadId -> memory bytes

    def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))

getMaxMemory方法用于获取shuffle所有线程占用的最大内存,实现如下。

def getMaxMemory(conf: SparkConf): Long = {

    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)

    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)

    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong

}

从上面代码可以看出,shuffle所有线程占用的最大内存的计算公式为:

Java运行时最大内存 * Spark的shuffle最大内存占比 * Spark的安全内存占比

可以配置属性spark.shuffle.memoryFraction修改Spark的shuffle最大内存占比,配置属性spark.shuffle.safetyFraction修改Spark的安全内存占比。

ShuffleMemoryManager通常运行在Executor中,Driver中的ShuffleMemoryManager 只有在local模式下才起作用。

3.2.6 块传输服务BlockTransferService

BlockTransferService默认为NettyBlockTransferService(可以配置属性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合。

val blockTransferService =

    conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {

        case "netty" =>

            new NettyBlockTransferService(conf, securityManager, numUsableCores)

        case "nio" =>

            new NioBlockTransferService(conf, securityManager)

    }

NettyBlockTransferService的具体实现将在第4章详细介绍。这里大家可能觉得奇怪,这样的网络应用为何也要放在存储体系?大家不妨先带着疑问,直到你真正了解了存储体系。

3.2.7 BlockManagerMaster介绍

BlockManagerMaster负责对Block的管理和协调,具体操作依赖于BlockManager-MasterActor。Driver和Executor处理BlockManagerMaster的方式不同:

如果当前应用程序是Driver,则创建BlockManagerMasterActor,并且注册到Actor-System中。

如果当前应用程序是Executor,则从ActorSystem中找到BlockManagerMasterActor。

无论是Driver还是Executor,最后BlockManagerMaster的属性driverActor将持有对BlockManagerMasterActor的引用。BlockManagerMaster的创建代码如下。

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(

    "BlockManagerMaster",

    new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

registerOrLookup已在3.2.3节介绍过了,不再赘述。BlockManagerMaster及BlockManager-MasterActor的具体实现将在第4章详细介绍。

3.2.8 创建块管理器BlockManager

BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效的。BlockManager作为存储系统的一部分,具体实现见第4章。BlockManager的创建代码如下。

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,

    serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)

3.2.9 创建广播管理器BroadcastManager

BroadcastManager用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。创建BroadcastManager的代码实现如下。

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

BroadcastManager必须在其初始化方法initialize被调用后,才能生效。initialize方法实际利用反射生成广播工厂实例broadcastFactory(可以配置属性spark.broadcast.factory指定,默认为org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的广播方法newBroadcast实际代理了工厂broadcastFactory的newBroadcast方法来生成广播对象。unbroadcast方法实际代理了工厂broadcastFactory的unbroadcast方法生成非广播对象。BroadcastManager的initialize、unbroadcast及newBroadcast方法见代码清单3-8。

代码清单3-8 BroadcastManager的实现

private def initialize() {

    synchronized {

        if (!initialized) {

            val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

            broadcastFactory =

                Class.forName(broadcastFactoryClass).newInstance.asInstanceOf [BroadcastFactory]

            broadcastFactory.initialize(isDriver, conf, securityManager)

            initialized = true

        }

    }

    }


    private val nextBroadcastId = new AtomicLong(0)


    def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {

        broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())

    }


    def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {

        broadcastFactory.unbroadcast(id, removeFromDriver, blocking)

    }

}

3.2.10 创建缓存管理器CacheManager

CacheManager用于缓存RDD某个分区计算后的中间结果,缓存计算结果发生在迭代计算的时候,将在6.1节讲到。而CacheManager将在4.10节详细描述。创建CacheManager的代码如下。

val cacheManager = new CacheManager(blockManager)

3.2.11 HTTP文件服务器HttpFileServer

HttpFileServer的创建参见代码清单3-9。HttpFileServer主要提供对jar及其他文件的http访问,这些jar包包括用户上传的jar包。端口由属性spark.fileserver.port配置,默认为0,表示随机生成端口号。

代码清单3-9 HttpFileServer的创建

val httpFileServer =

    if (isDriver) {

        val fileServerPort = conf.getInt("spark.fileserver.port", 0)

        val server = new HttpFileServer(conf, securityManager, fileServerPort)

        server.initialize()

        conf.set("spark.fileserver.uri",  server.serverUri)

        server

    } else {

        null

    }

HttpFileServer的初始化过程见代码清单3-10,主要包括以下步骤:

1)使用Utils工具类创建文件服务器的根目录及临时目录(临时目录在运行时环境关闭时会删除)。Utils工具的详细介绍见附录A。

2)创建存放jar包及其他文件的文件目录。

3)创建并启动HTTP服务。

代码清单3-10 HttpFileServer的初始化

def initialize() {

    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")

    fileDir = new File(baseDir, "files")

    jarDir = new File(baseDir, "jars")

    fileDir.mkdir()

    jarDir.mkdir()

    logInfo("HTTP File server directory is " + baseDir)

    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")

    httpServer.start()

    serverUri = httpServer.uri

    logDebug("HTTP file server started at: " + serverUri)

}

HttpServer的构造和start方法的实现中,再次使用了Utils的静态方法startServiceOnPort,因此会回调doStart方法,见代码清单3-11。有关Jetty的API使用参见附录C。

代码清单3-11 HttpServer的启动

def start() {

    if (server != null) {

        throw new ServerStateException("Server is already started")

    } else {

        logInfo("Starting HTTP Server")

        val (actualServer, actualPort) =

            Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)

        server = actualServer

        port = actualPort

    }

}

doStart方法中启动内嵌的Jetty所提供的HTTP服务,见代码清单3-12。

代码清单3-12 HttpServer的启动功能实现

private def doStart(startPort: Int): (Server, Int) = {

    val server = new Server()

    val connector = new SocketConnector

    connector.setMaxIdleTime(60 * 1000)

    connector.setSoLingerTime(-1)

    connector.setPort(startPort)

    server.addConnector(connector)


    val threadPool = new QueuedThreadPool

    threadPool.setDaemon(true)

    server.setThreadPool(threadPool)

    val resHandler = new ResourceHandler

    resHandler.setResourceBase(resourceBase.getAbsolutePath)


    val handlerList = new HandlerList

    handlerList.setHandlers(Array(resHandler, new DefaultHandler))


    if (securityManager.isAuthenticationEnabled()) {

        logDebug("HttpServer is using security")

        val sh = setupSecurityHandler(securityManager)

        // make sure we go through security handler to get resources

        sh.setHandler(handlerList)

        server.setHandler(sh)

    } else {

        logDebug("HttpServer is not using security")

        server.setHandler(handlerList)

    }


    server.start()

    val actualPort = server.getConnectors()(0).getLocalPort


    (server, actualPort)

}

3.2.12 创建测量系统MetricsSystem

MetricsSystem是Spark的测量系统,创建MetricsSystem的代码如下。

val metricsSystem = if (isDriver) {

        MetricsSystem.createMetricsSystem("driver", conf, securityManager)

    } else {

        conf.set("spark.executor.id", executorId)

        val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)

        ms.start()

        ms

    }

上面调用的createMetricsSystem方法实际创建了MetricsSystem,代码如下。

def createMetricsSystem(

    instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {

    new MetricsSystem(instance, conf, securityMgr)

}

构造MetricsSystem的过程最重要的是调用了MetricsConfig的initialize方法,见代码清单3-13。

代码清单3-13 MetricsConfig的初始化

def initialize() {

    setDefaultProperties(properties)


    var is: InputStream = null

    try {

        is = configFile match {

            case Some(f) => new FileInputStream(f)

            case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)

        }


        if (is != null) {

            properties.load(is)

        }

    } catch {

        case e: Exception => logError("Error loading configure file", e)

    } finally {

        if (is != null) is.close()

    }


    propertyCategories = subProperties(properties, INSTANCE_REGEX)

    if (propertyCategories.contains(DEFAULT_PREFIX)) {

        import scala.collection.JavaConversions._


        val defaultProperty = propertyCategories(DEFAULT_PREFIX)

        for { (inst, prop) <- propertyCategories

            if (inst != DEFAULT_PREFIX)

            (k, v) <- defaultProperty

            if (prop.getProperty(k) == null) } {

        prop.setProperty(k, v)

        }

    }

}

从以上实现可以看出,MetricsConfig的initialize方法主要负责加载metrics.properties文件中的属性配置,并对属性进行初始化转换。

例如,将属性

{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *.sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, master.sink.servlet.path=/metrics/master/json}

转换为

Map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json})

3.2.13 创建SparkEnv

当所有的基础组件准备好后,最终使用下面的代码创建执行环境SparkEnv。

new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager,

        mapOutputTracker, shuffleManager, broadcastManager, blockTransferService,

    blockManager, securityManager, httpFileServer, sparkFilesDir, 

metricsSystem, shuffleMemoryManager, conf)

serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer类的实例,其中closureSerializer实例特别用来对Scala中的闭包进行序列化。

3.3 创建metadataCleaner

SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]

private[spark] val metadataCleaner =

    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

我们仔细看看MetadataCleaner的实现,见代码清单3-14。

代码清单3-14 MetadataCleaner的实现

private[spark] class MetadataCleaner(

        cleanerType: MetadataCleanerType.MetadataCleanerType,

        cleanupFunc: (Long) => Unit,

        conf: SparkConf)

    extends Logging

{

    val name = cleanerType.toString


    private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)

    private val periodSeconds = math.max(10, delaySeconds / 10)

    private val timer = new Timer(name + " cleanup timer", true)


    private val task = new TimerTask {

        override def run() {

        try {

            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))

            logInfo("Ran metadata cleaner for " + name)

        } catch {

            case e: Exception => logError("Error running cleanup task for " + name, e)

        }

      }

    }


    if (delaySeconds > 0) {

        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)

    }


    def cancel() {

        timer.cancel()

    }

}

从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) => Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。

private[spark] def cleanup(cleanupTime: Long) {

    persistentRdds.clearOldValues(cleanupTime)

}

3.4 SparkUI详解

任何系统都需要提供监控功能,用浏览器能访问具有样式及布局并提供丰富监控数据的页面无疑是一种简单、高效的方式。SparkUI就是这样的服务,它的架构如图3-1所示。

在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。


图3-1 SparkUI架构

我们先简单介绍图3-1中的各个组件:DAGScheduler是主要的产生各类SparkListener-Event的源头,它将各种SparkListenerEvent发送到listenerBus的事件队列中,listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。从图3-1中还可以看到Spark里定义了很多监听器SparkListener的实现,包括JobProgressListener、EnvironmentListener、StorageListener、ExecutorsListener,它们的类继承体系如图3-2所示。


图3-2 SparkListener的类继承体系

3.4.1 listenerBus详解

listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus由以下部分组成:

事件阻塞队列:类型为LinkedBlockingQueue[SparkListenerEvent],固定大小是10 000;

监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener。

事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。因此使用listenerBus这个名字再合适不过了,到站就下车。listenerBus的实现见代码清单3-15。

代码清单3-15 LiveListenerBus的事件处理实现

private val EVENT_QUEUE_CAPACITY = 10000

    private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

    private var queueFullErrorMessageLogged = false

    private var started = false

    // A counter that represents the number of events produced and consumed in the queue

    private val eventLock = new Semaphore(0)


    private val listenerThread = new Thread("SparkListenerBus") {

      setDaemon(true)

      override def run(): Unit = Utils.logUncaughtExceptions {

        while (true) {

            eventLock.acquire()

            // Atomically remove and process this event

            LiveListenerBus.this.synchronized {

                val event = eventQueue.poll

                if (event == SparkListenerShutdown) {

                    // Get out of the while loop and shutdown the daemon thread

                    return

                }

                Option(event).foreach(postToAll)

            }

        }

    }

}


def start() {

    if (started) {

        throw new IllegalStateException("Listener bus already started!")

    }

    listenerThread.start()

    started = true

    }

def post(event: SparkListenerEvent) {

    val eventAdded = eventQueue.offer(event)

    if (eventAdded) {

        eventLock.release()

    } else {

        logQueueFullErrorMessage()

    }

}

  

def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }


def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }


def stop() {

   if (!started) {

        throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")

    }

    post(SparkListenerShutdown)

    listenerThread.join()

}

LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,如代码清单3-16所示。

代码清单3-16 SparkListenerBus中的监听器调用

protected val sparkListeners = new ArrayBuffer[SparkListener]

    with mutable.SynchronizedBuffer[SparkListener]


def addListener(listener: SparkListener) {

    sparkListeners += listener

}


def postToAll(event: SparkListenerEvent) {

    event match {

        case stageSubmitted: SparkListenerStageSubmitted =>

            foreachListener(_.onStageSubmitted(stageSubmitted))

        case stageCompleted: SparkListenerStageCompleted =>

            foreachListener(_.onStageCompleted(stageCompleted))

        case jobStart: SparkListenerJobStart =>

            foreachListener(_.onJobStart(jobStart))

        case jobEnd: SparkListenerJobEnd =>

            foreachListener(_.onJobEnd(jobEnd))

        case taskStart: SparkListenerTaskStart =>

            foreachListener(_.onTaskStart(taskStart))

        case taskGettingResult: SparkListenerTaskGettingResult =>

            foreachListener(_.onTaskGettingResult(taskGettingResult))

        case taskEnd: SparkListenerTaskEnd =>

            foreachListener(_.onTaskEnd(taskEnd))

        case environmentUpdate: SparkListenerEnvironmentUpdate =>

            foreachListener(_.onEnvironmentUpdate(environmentUpdate))

        case blockManagerAdded: SparkListenerBlockManagerAdded =>

            foreachListener(_.onBlockManagerAdded(blockManagerAdded))

        case blockManagerRemoved: SparkListenerBlockManagerRemoved =>

            foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))

        case unpersistRDD: SparkListenerUnpersistRDD =>

            foreachListener(_.onUnpersistRDD(unpersistRDD))

        case applicationStart: SparkListenerApplicationStart =>

            foreachListener(_.onApplicationStart(applicationStart))

        case applicationEnd: SparkListenerApplicationEnd =>

            foreachListener(_.onApplicationEnd(applicationEnd))

        case metricsUpdate: SparkListenerExecutorMetricsUpdate =>

            foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))

        case SparkListenerShutdown =>

    }

}


private def foreachListener(f: SparkListener => Unit): Unit = {

    sparkListeners.foreach { listener =>

        try {

            f(listener)

        } catch {

            case e: Exception =>

            logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)

       }

    }

}

3.4.2 构造JobProgressListener

我们以JobProgressListener为例来讲解SparkListener。JobProgressListener是SparkContext中一个重要的组成部分,通过监听listenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。创建JobProgressListener的代码如下。

private[spark] val jobProgressListener = new JobProgressListener(conf)

listenerBus.addListener(jobProgressListener)


val statusTracker = new SparkStatusTracker(this)

JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIData信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。这些统计信息最终会被JobPage和StagePage等页面访问和渲染。JobProgressListener的数据结构见代码清单3-17。

代码清单3-17 JobProgressListener维护的信息

class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {


    import JobProgressListener._


    type JobId = Int

    type StageId = Int

    type StageAttemptId = Int

    type PoolName = String

    type ExecutorId = String


    // Jobs:

    val activeJobs = new HashMap[JobId, JobUIData]

    val completedJobs = ListBuffer[JobUIData]()

    val failedJobs = ListBuffer[JobUIData]()

    val jobIdToData = new HashMap[JobId, JobUIData]


    // Stages:

    val activeStages = new HashMap[StageId, StageInfo]

    val completedStages = ListBuffer[StageInfo]()

    val skippedStages = ListBuffer[StageInfo]()

    val failedStages = ListBuffer[StageInfo]()

    val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]

    val stageIdToInfo = new HashMap[StageId, StageInfo]

    val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]

    val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()

    var numCompletedStages = 0 // 总共完成的Stage数量

    var numFailedStages = 0 // 总共失败的Stage数量


    // Misc:

    val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()

    def blockManagerIds = executorIdToBlockManagerId.values.toSeq


    var schedulingMode: Option[SchedulingMode] = None


    // number of non-active jobs and stages (there is no limit for active jobs   and stages):

    val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)

    val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)

JobProgressListener 实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法正是在listenerBus的驱动下,改变JobProgress-Listener中的各种Job、Stage相关的数据。

3.4.3 SparkUI的创建与初始化

SparkUI的创建,见代码清单3-18。

代码清单3-18 SparkUI的声明

private[spark] val ui: Option[SparkUI] =

    if (conf.getBoolean("spark.ui.enabled", true)) {

        Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,

            env.securityManager,appName))

    } else {

        None

    }


ui.foreach(_.bind())

可以看到如果不需要提供SparkUI服务,可以将属性spark.ui.enabled修改为false。其中createLiveUI实际是调用了create方法,见代码清单3-19。

代码清单3-19 SparkUI的创建

def createLiveUI(

        sc: SparkContext,

        conf: SparkConf,

        listenerBus: SparkListenerBus,

        jobProgressListener: JobProgressListener,

        securityManager: SecurityManager,

        appName: String): SparkUI =  {

    create(Some(sc), conf, listenerBus, securityManager, appName,

        jobProgressListener = Some(jobProgressListener))

  }

create方法的实现参见代码清单3-20。

代码清单3-20 creat方法的实现

private def create(

        sc: Option[SparkContext],

        conf: SparkConf,

        listenerBus: SparkListenerBus,

        securityManager: SecurityManager,

        appName: String,

        basePath: String = "",

        jobProgressListener: Option[JobProgressListener] = None): SparkUI = {


    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {

        val listener = new JobProgressListener(conf)

        listenerBus.addListener(listener)

        listener

    }


    val environmentListener = new EnvironmentListener

    val storageStatusListener = new StorageStatusListener

    val executorsListener = new ExecutorsListener(storageStatusListener)

    val storageListener = new StorageListener(storageStatusListener)


    listenerBus.addListener(environmentListener)

    listenerBus.addListener(storageStatusListener)

    listenerBus.addListener(executorsListener)

    listenerBus.addListener(storageListener)


    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,

        executorsListener, _jobProgressListener, storageListener, appName, basePath)

}

根据代码清单3-20,可以知道在create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener。例如,用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener等。最后创建SparkUI,Spark UI服务默认是可以被杀掉的,通过修改属性spark.ui.killEnabled为false可以保证不被杀死。initialize方法会组织前端页面各个Tab和Page的展示及布局,参见代码清单3-21。

代码清单3-21 SparkUI的初始化

private[spark] class SparkUI private (

    val sc: Option[SparkContext],

    val conf: SparkConf,

    val securityManager: SecurityManager,

    val environmentListener: EnvironmentListener,

    val storageStatusListener: StorageStatusListener,

    val executorsListener: ExecutorsListener,

    val jobProgressListener: JobProgressListener,

    val storageListener: StorageListener,

    var appName: String,

    val basePath: String)

extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")

with Logging {


val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)


/** Initialize all components of the server. */

def initialize() {

    attachTab(new JobsTab(this))

    val stagesTab = new StagesTab(this)

    attachTab(stagesTab)

    attachTab(new StorageTab(this))

    attachTab(new EnvironmentTab(this))

    attachTab(new ExecutorsTab(this))

    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))

    attachHandler(

        createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))

}

initialize()

3.4.4 Spark UI的页面布局与展示

SparkUI究竟是如何实现页面布局及展示的?JobsTab展示所有Job的进度、状态信息,这里我们以它为例来说明。JobsTab会复用SparkUI的killEnabled、SparkContext、job-ProgressListener,包括AllJobsPage和JobPage两个页面,见代码清单3-22。

代码清单3-22 JobsTab的实现

private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {

    val sc = parent.sc

    val killEnabled = parent.killEnabled

    def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)

    val listener = parent.jobProgressListener


    attachPage(new AllJobsPage(this))

    attachPage(new JobPage(this))

}

AllJobsPage由render方法渲染,利用jobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等,见代码清单3-23。

代码清单3-23 AllJobsPage的实现

def render(request: HttpServletRequest): Seq[Node] = {

    listener.synchronized {

        val activeJobs = listener.activeJobs.values.toSeq

        val completedJobs = listener.completedJobs.reverse.toSeq

        val failedJobs = listener.failedJobs.reverse.toSeq

        val now = System.currentTimeMillis


        val activeJobsTable =

            jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)

        val completedJobsTable =

            jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

        val failedJobsTable =

            jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)


        val summary: NodeSeq =

            <div>

                <ul class="unstyled">

                    {if (startTime.isDefined) {

                        // Total duration is not meaningful unless the UI is live

                        <li>

                            <strong>Total Duration: </strong>

                            {UIUtils.formatDuration(now - startTime.get)}

                        </li>

                    }}

                    <li>

                        <strong>Scheduling Mode: </strong>

                        {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}

                    </li>

                    <li>

                        <a href="#active"><strong>Active Jobs:</strong></a>

                        {activeJobs.size}

                    </li>

                    <li>

                        <a href="#completed"><strong>Completed Jobs:</strong></a>

                        {completedJobs.size}

                    </li>

                    <li>

                        <a href="#failed"><strong>Failed Jobs:</strong></a>

                        {failedJobs.size}

                    </li>

                </ul>

            </div>

jobsTable用来生成表格数据,见代码清单3-24。

代码清单3-24 jobsTable处理表格的实现

private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {

    val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)


    val columns: Seq[Node] = {

        <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>

        <th>Description</th>

        <th>Submitted</th>

        <th>Duration</th>

        <th class="sorttable_nosort">Stages: Succeeded/Total</th>

        <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>

    }


    <table class="table table-bordered table-striped table-condensed sortable">

        <thead>{columns}</thead>

        <tbody>

            {jobs.map(makeRow)}

        </tbody>

    </table>

}

表格中每行数据又是通过makeRow方法渲染的,参见代码清单3-25。

代码清单3-25 生成表格中的行

def makeRow(job: JobUIData): Seq[Node] = {

    val lastStageInfo = Option(job.stageIds)

        .filter(_.nonEmpty)

        .flatMap { ids => listener.stageIdToInfo.get(ids.max) }

    val lastStageData = lastStageInfo.flatMap { s =>

        listener.stageIdToData.get((s.stageId, s.attemptId))

    }

    val isComplete = job.status == JobExecutionStatus.SUCCEEDED

    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")

    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")

    val duration: Option[Long] = {

        job.startTime.map { start =>

            val end = job.endTime.getOrElse(System.currentTimeMillis())

        end - start

        }

    }

    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")

    val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")

    val detailUrl =

        "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)

    <tr>

        <td sorttable_customkey={job.jobId.toString}>

            {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}

        </td>

        <td>

            <div><em>{lastStageDescription}</em></div>

            <a href={detailUrl}>{lastStageName}</a>

        </td>

            <td sorttable_customkey={job.startTime.getOrElse(-1).toString}>

            {formattedSubmissionTime}

        </td>

        <td sorttable_customkey={duration.getOrElse(-1).toString}>{formatted-Duration}</td>

        <td class="stage-progress-cell">

            {job.completedStageIndices.size}/{job.stageIds.size - job.numSkipped-Stages}

            {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}

            {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}

        </td>

        <td class="progress-cell">

            {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,

            failed = job.numFailedTasks, skipped = job.numSkippedTasks,

            total = job.numTasks - job.numSkippedTasks)}

        </td>

    </tr>

}

代码清单3-22中的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中,参见代码清单3-26。

代码清单3-26 WebUITab的实现

private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {

    val pages = ArrayBuffer[WebUIPage]()

    val name = prefix.capitalize


    /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */

    def attachPage(page: WebUIPage) {

        page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")

        pages += page

    }


    /** Get a list of header tabs from the parent UI. */

    def headerTabs: Seq[WebUITab] = parent.getTabs


    def basePath: String = parent.getBasePath

}

JobsTab创建之后,将被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,给每一个page生成org.eclipse.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI,即加入到handlers :ArrayBuffer[ServletContextHandler]和样例类ServerInfo的rootHandler(ContextHandlerCollection)中。SparkUI继承自WebUI,attachTab方法在WebUI中实现,参见代码清单3-27。

代码清单3-27 WebUI的实现

private[spark] abstract class WebUI( securityManager: SecurityManager, port: Int,

        conf: SparkConf, basePath: String = "", name: String = "") extends Logging {


    protected val tabs = ArrayBuffer[WebUITab]()

    protected val handlers = ArrayBuffer[ServletContextHandler]()

    protected var serverInfo: Option[ServerInfo] = None

    protected val localHostName = Utils.localHostName()

    protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)

    private val className = Utils.getFormattedClassName(this)


    def getBasePath: String = basePath

    def getTabs: Seq[WebUITab] = tabs.toSeq

    def getHandlers: Seq[ServletContextHandler] = handlers.toSeq

    def getSecurityManager: SecurityManager = securityManager


    /** Attach a tab to this UI, along with all of its attached pages. */

    def attachTab(tab: WebUITab) {

        tab.pages.foreach(attachPage)

        tabs += tab

    }


    /** Attach a page to this UI. */

    def attachPage(page: WebUIPage) {

        val pagePath = "/" + page.prefix

        attachHandler(createServletHandler(pagePath,

        (request: HttpServletRequest) => page.render(request), securityManager, basePath))

    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",

        (request: HttpServletRequest) => page.renderJson(request), security-Manager, basePath))

}


    /** Attach a handler to this UI. */

    def attachHandler(handler: ServletContextHandler) {

        handlers += handler

        serverInfo.foreach { info =>

            info.rootHandler.addHandler(handler)

            if (!handler.isStarted) {

                handler.start()

        }

    }

}

由于代码清单3-27所在的类中使用import org.apache.spark.ui.JettyUtils._导入了JettyUtils的静态方法,所以createServletHandler方法实际是JettyUtils 的静态方法createServletHandler。createServletHandler实际创建了javax.servlet.http.HttpServlet的匿名内部类实例,此实例实际使用(request: HttpServletRequest) => page.render(request)函数参数来处理请求,进而渲染页面呈现给用户。有关createServletHandler的实现及Jetty的相关信息,请参阅附录C。

3.4.5 SparkUI的启动

SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口,bind方法中主要的代码实现如下。

serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))

JettyUtils的静态方法startJettyServer的实现请参阅附录C。最终启动了Jetty提供的服务,默认端口是4040。

3.5 Hadoop相关配置及Executor环境变量

3.5.1 Hadoop相关配置信息

默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关配置信息的代码如下。

val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

获取的配置信息包括:

将Amazon S3文件系统的AccessKeyId和SecretAccessKey加载到Hadoop的Configuration;

将SparkConf中所有以spark.hadoop.开头的属性都复制到Hadoop的Configuration;

将SparkConf的属性spark.buffer.size复制为Hadoop的Configuration的配置io.file.buffer.size。

如果指定了SPARK_YARN_MODE属性,则会使用YarnSparkHadoopUtil,否则默认为SparkHadoopUtil。

3.5.2 Executor环境变量

对Executor的环境变量的处理,参见代码清单3-28。executorEnvs 包含的环境变量将会在7.2.2节中介绍的注册应用的过程中发送给Master,Master给Worker发送调度后,Worker最终使用executorEnvs提供的信息启动Executor。可以通过配置spark.executor.memory指定Executor占用的内存大小,也可以配置系统变量SPARK_EXECUTOR_MEMORY或者SPARK_MEM对其大小进行设置。

代码清单3-28 Executor环境变量的处理

private[spark] val executorMemory = conf.getOption("spark.executor.memory")

        .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))

        .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))

        .map(Utils.memoryStringToMb)

        .getOrElse(512)


    // Environment variables to pass to our executors.

    private[spark] val executorEnvs = HashMap[String, String]()


    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))

        value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty (propKey)))} {

        executorEnvs(envKey) = value

    }

    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>

        executorEnvs("SPARK_PREPEND_CLASSES") = v

    }

    // The Mesos scheduler backend relies on this environment variable to set executor memory.

  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"

    executorEnvs ++= conf.getExecutorEnv


    // Set SPARK_USER for user who is running SparkContext.

    val sparkUser = Option {

        Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))

    }.getOrElse {

        SparkContext.SPARK_UNKNOWN_USER

    }

    executorEnvs("SPARK_USER") = sparkUser

3.6 创建任务调度器TaskScheduler

TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler也可以看做任务调度的客户端。创建TaskScheduler的代码如下。

private[spark] var (schedulerBackend, taskScheduler) =

    SparkContext.createTaskScheduler(this, master)

createTaskScheduler方法会根据master的配置匹配部署模式,创建TaskSchedulerImpl,并生成不同的SchedulerBackend。本章为了使读者更容易理解Spark的初始化流程,故以local模式为例,其余模式将在第7章详解。master匹配local模式的代码如下。

master match {

    case "local" =>

        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)

        val backend = new LocalBackend(scheduler, 1)

        scheduler.initialize(backend)

        (backend, scheduler)

3.6.1 创建TaskSchedulerImpl

TaskSchedulerImpl的构造过程如下:

1)从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式有FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。

2)创建TaskResultGetter,它的作用是通过线程池(Executors.newFixedThreadPool创建的,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.default-ThreadFactory)对Worker上的Executor发送的Task的执行结果进行处理。

TaskSchedulerImpl的实现见代码清单3-29。

代码清单3-29 TaskSchedulerImpl的实现

var dagScheduler: DAGScheduler = null

var backend: SchedulerBackend = null

val mapOutputTracker = SparkEnv.get.mapOutputTracker

var schedulableBuilder: SchedulableBuilder = null

var rootPool: Pool = null

// default scheduler is FIFO

private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

val schedulingMode: SchedulingMode = try {

    SchedulingMode.withName(schedulingModeConf.toUpperCase)

} catch {

    case e: java.util.NoSuchElementException =>

        throw new SparkException(s"Unrecognized spark.scheduler.mode: $scheduling-ModeConf")

}


// This is a var so that we can reset it for testing purposes.

private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

TaskSchedulerImpl的调度模式有FAIR和FIFO两种。任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上的。为方便分析,我们先来看看local模式中SchedulerBackend的实现LocalBackend。LocalBackend依赖于LocalActor与ActorSystem进行消息通信。LocalBackend的实现参见代码清单3-30。

代码清单3-30 LocalBackend的实现

private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)

    extends SchedulerBackend with ExecutorBackend {


    private val appId = "local-" + System.currentTimeMillis

    var localActor: ActorRef = null


    override def start() {

        localActor = SparkEnv.get.actorSystem.actorOf(

            Props(new LocalActor(scheduler, this, totalCores)),

            "LocalBackendActor")

    }


    override def stop() {

        localActor ! StopExecutor

    }


    override def reviveOffers() {

        localActor ! ReviveOffers

    }


    override def defaultParallelism() =

        scheduler.conf.getInt("spark.default.parallelism", totalCores)


    override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {

        localActor ! KillTask(taskId, interruptThread)

    }


    override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {

        localActor ! StatusUpdate(taskId, state, serializedData)

    }


    override def applicationId(): String = appId

}

3.6.2 TaskSchedulerImpl的初始化

创建完TaskSchedulerImpl和LocalBackend后,对TaskSchedulerImpl调用方法initialize进行初始化。以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:

1)使TaskSchedulerImpl持有LocalBackend的引用。

2)创建Pool,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息。

3)创建FIFOSchedulableBuilder,FIFOSchedulableBuilder用来操作Pool中的调度队列。

initialize方法的实现见代码清单3-31。

代码清单3-31 TaskSchedulerImpl的初始化

def initialize(backend: SchedulerBackend) {

    this.backend = backend

    rootPool = new Pool("", schedulingMode, 0, 0)

    schedulableBuilder = {

        schedulingMode match {

            case SchedulingMode.FIFO =>

                new FIFOSchedulableBuilder(rootPool)

            case SchedulingMode.FAIR =>

                new FairSchedulableBuilder(rootPool, conf)

        }

    }

    schedulableBuilder.buildPools()

}

3.7 创建和启动DAGScheduler

DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAG-Scheduler的代码如下。

@volatile private[spark] var dagScheduler: DAGScheduler = _

    dagScheduler = new DAGScheduler(this)

DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,见代码清单3-32。

代码清单3-32 DAGScheduler维护的数据结构

private[scheduler] val nextJobId = new AtomicInteger(0)

private[scheduler] def numTotalJobs: Int = nextJobId.get()

private val nextStageId = new AtomicInteger(0)


private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]

private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]

private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]


    // Stages we need to run whose parents aren't done

    private[scheduler] val waitingStages = new HashSet[Stage]

    // Stages we are running right now

    private[scheduler] val runningStages = new HashSet[Stage]

    // Stages that must be resubmitted due to fetch failures

    private[scheduler] val failedStages = new HashSet[Stage]


    private[scheduler] val activeJobs = new HashSet[ActiveJob]


    // Contains the locations that each RDD's partitions are cached on

    private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

    private val failedEpoch = new HashMap[String, Long]


    private val dagSchedulerActorSupervisor =

        env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))


    private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

在构造DAGScheduler的时候会调用initializeEventProcessActor方法创建DAGScheduler-EventProcessActor,见代码清单3-33。

代码清单3-33 DAGSchedulerEventProcessActor的初始化

    private[scheduler] var eventProcessActor: ActorRef = _

private def initializeEventProcessActor() {

        // blocking the thread until supervisor is started, which ensures eventProcess-Actor is

        // not null before any job is submitted

        implicit val timeout = Timeout(30 seconds)

        val initEventActorReply =

            dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))

        eventProcessActor = Await.result(initEventActorReply, timeout.duration).

            asInstanceOf[ActorRef]

}


initializeEventProcessActor()

这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。从代码清单3-34可以看出,DAGScheduler-ActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAG-SchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册到ActorSystem,ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了dagScheduler,见代码清单3-35。从代码清单3-35我们还看到DAG-SchedulerEventProcessActor所能处理的消息类型,比如JobSubmitted、BeginEvent、CompletionEvent等。DAGScheduler-EventProcessActor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。

代码清单3-34 DAGSchedulerActorSupervisor的监管策略

private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)

    extends Actor with Logging {


    override val supervisorStrategy =

        OneForOneStrategy() {

            case x: Exception =>

                logError("eventProcesserActor failed; shutting down SparkContext", x)

                try {

                    dagScheduler.doCancelAllJobs()

                } catch {

                    case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)

                }

                dagScheduler.sc.stop()

                Stop

    }


def receive = {

        case p: Props => sender ! context.actorOf(p)

        case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")

    }

}

代码清单3-35 DAGSchedulerEventProcessActor的实现

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGS-cheduler)

    extends Actor with Logging {

    override def preStart() {

        dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)

    }

    /**

    * The main event loop of the DAG scheduler.

    */

    def receive = {

        case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,

                listener, properties)

        case StageCancelled(stageId) =>

            dagScheduler.handleStageCancellation(stageId)

        case JobCancelled(jobId) =>

            dagScheduler.handleJobCancellation(jobId)

        case JobGroupCancelled(groupId) =>

            dagScheduler.handleJobGroupCancelled(groupId)

        case AllJobsCancelled =>

            dagScheduler.doCancelAllJobs()

        case ExecutorAdded(execId, host) =>

            dagScheduler.handleExecutorAdded(execId, host)

        case ExecutorLost(execId) =>

            dagScheduler.handleExecutorLost(execId, fetchFailed = false)

        case BeginEvent(task, taskInfo) =>

            dagScheduler.handleBeginEvent(task, taskInfo)

        case GettingResultEvent(taskInfo) =>

            dagScheduler.handleGetTaskResult(taskInfo)

        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

            dagScheduler.handleTaskCompletion(completion)

        case TaskSetFailed(taskSet, reason) =>

            dagScheduler.handleTaskSetFailed(taskSet, reason)

        case ResubmitFailedStages =>

            dagScheduler.resubmitFailedStages()

}

override def postStop() {

    // Cancel any active jobs in postStop hook

    dagScheduler.cleanUpAfterSchedulerStop()

}

3.8 TaskScheduler的启动

3.6节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。

taskScheduler.start()

TaskScheduler在启动的时候,实际调用了backend的start方法。

override def start() {

        backend.start()

    }

以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示。

3.8.1 创建LocalActor

创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。

代码清单3-36 LocalActor的实现

private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,

    private val totalCores: Int) extends Actor with ActorLogReceive with Logging {

    import context.dispatcher   // to use Akka's scheduler.scheduleOnce()

    private var freeCores = totalCores

    private val localExecutorId = SparkContext.DRIVER_IDENTIFIER

    private val localExecutorHostname = "localhost"


    val executor = new Executor(

        localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)


    override def receiveWithLogging = {

        case ReviveOffers =>

            reviveOffers()


        case StatusUpdate(taskId, state, serializedData) =>

            scheduler.statusUpdate(taskId, state, serializedData)

            if (TaskState.isFinished(state)) {

                freeCores += scheduler.CPUS_PER_TASK

                reviveOffers()

            }


        case KillTask(taskId, interruptThread) =>

            executor.killTask(taskId, interruptThread)


        case StopExecutor =>

            executor.stop()

    }


}

Executor的构建,见代码清单3-37,主要包括以下步骤。

1)创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.8.2节详细介绍。

2)获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。

3)创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。

4)urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。

5)创建Executor执行Task的线程池。此线程池用于执行任务。

6)启动Executor的心跳线程。此线程用于向Driver发送心跳。

此外,还包括Akka发送消息的帧大小(10 485 760字节)、结果总大小的字节限制(1 073 741 824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。

代码清单3-37 Executor的构建

    val executorSource = new ExecutorSource(this, executorId)

private val env = {

        if (!isLocal) {

            val port = conf.getInt("spark.executor.port", 0)

            val _env = SparkEnv.createExecutorEnv(

                conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)

            SparkEnv.set(_env)

            _env.metricsSystem.registerSource(executorSource)

            _env.blockManager.initialize(conf.getAppId)

            _env

        } else {

            SparkEnv.get

        }

    }


    private val executorActor = env.actorSystem.actorOf(

        Props(new ExecutorActor(executorId)), "ExecutorActor")


    private val urlClassLoader = createClassLoader()

    private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

    env.serializer.setDefaultClassLoader(urlClassLoader)


    private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

    private val maxResultSize = Utils.getMaxResultSize(conf)


    val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

    startDriverHeartbeater()

3.8.2 ExecutorSource的创建与注册

ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、thread-pool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。

代码清单3-38 ExecutorSource的实现

private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {

    private def fileStats(scheme: String) : Option[FileSystem.Statistics] =

        FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption


    private def registerFileSystemStat[T](

            scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {

        metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {

            override def getValue: T = fileStats(scheme).map(f).getOrElse (defaultValue)

        })

    }

    override val metricRegistry = new MetricRegistry()

    override val sourceName = "executor"


metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {

        override def getValue: Int = executor.threadPool.getActiveCount()

    })

    metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {

        override def getValue: Long = executor.threadPool.getCompletedTaskCount()

    })

    metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {

        override def getValue: Int = executor.threadPool.getPoolSize()

    })

    metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {

        override def getValue: Int = executor.threadPool.getMaximumPoolSize()

    })


    // Gauge for file system stats of this executor

    for (scheme <- Array("hdfs", "file")) {

        registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)

        registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)

        registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)

        registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)

        registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)

    }

}

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。

代码清单3-39 MetricsSystem注册Source的实现

def registerSource(source: Source) {

    sources += source

    try {

        val regName = buildRegistryName(source)

        registry.register(regName, source.metricRegistry)

    } catch {

        case e: IllegalArgumentException => logInfo("Metrics already registered", e)

    }

}

3.8.3 ExecutorActor的构建与注册

ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。

override def receiveWithLogging = {

    case TriggerThreadDump =>

        sender ! Utils.getThreadDump()

}

3.8.4 Spark自身ClassLoader的创建

获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。

代码清单3-40 Spark自身ClassLoader的创建

private def createClassLoader(): MutableURLClassLoader = {

    val currentLoader = Utils.getContextOrSparkClassLoader


    val urls = currentJars.keySet.map { uri =>

        new File(uri.split("/").last).toURI.toURL

    }.toArray

    val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)

    userClassPathFirst match {

        case true => new ChildExecutorURLClassLoader(urls, currentLoader)

        case false => new ExecutorURLClassLoader(urls, currentLoader)

    }

}

Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者Child-ExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。

代码清单3-41 ChildExecutorURLClassLoader和ExecutorLIRLClassLoader的实现

private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)

    extends MutableURLClassLoader {


    private object userClassLoader extends URLClassLoader(urls, null){

        override def addURL(url: URL) {

            super.addURL(url)

        }

    override def findClass(name: String): Class[_] = {

        super.findClass(name)

    }

}


private val parentClassLoader = new ParentClassLoader(parent)


override def findClass(name: String): Class[_] = {

    try {

        userClassLoader.findClass(name)

    } catch {

        case e: ClassNotFoundException => {

            parentClassLoader.loadClass(name)

        }

    }

}


    def addURL(url: URL) {

        userClassLoader.addURL(url)

    }


    def getURLs() = {

        userClassLoader.getURLs()

    }

}


private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)

    extends URLClassLoader(urls, parent) with MutableURLClassLoader {


    override def addURL(url: URL) {

        super.addURL(url)

    }

}

如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。

代码清单3-42 addReplClassLoaderIfNeeded的实现

private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {

    val classUri = conf.get("spark.repl.class.uri", null)

    if (classUri != null) {

        logInfo("Using REPL class URI: " + classUri)

        val userClassPathFirst: java.lang.Boolean =

        conf.getBoolean("spark.files.userClassPathFirst", false)

    try {

        val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")

            .asInstanceOf[Class[_ <: ClassLoader]]

        val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],

            classOf[ClassLoader], classOf[Boolean])

        constructor.newInstance(conf, classUri, parent, userClassPathFirst)

    } catch {

        case _: ClassNotFoundException =>

            logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")

            System.exit(1)

            null

        }

    } else {

        parent

    }

}

3.8.5 启动Executor的心跳线程

Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10 000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/Heartbeat-Receiver,最终创建一个运行过程中,每次会休眠10 000~20 000毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。

代码清单3-43 启动Executor的心跳线程

def startDriverHeartbeater() {

    val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)

    val timeout = AkkaUtils.lookupTimeout(conf)

    val retryAttempts = AkkaUtils.numRetries(conf)

    val retryIntervalMs = AkkaUtils.retryWaitMs(conf)

    val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)

    val t = new Thread() {

        override def run() {

            // Sleep a random interval so the heartbeats don't end up in sync

            Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])

            while (!isStopped) {

                val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()

                val curGCTime = gcTime

                for (taskRunner <- runningTasks.values()) {

                    if (!taskRunner.attemptedTask.isEmpty) {

                        Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>

                            metrics.updateShuffleReadMetrics

                            metrics.jvmGCTime = curGCTime - taskRunner.startGCTime

                            if (isLocal) {

                                val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))

                                tasksMetrics += ((taskRunner.taskId, copiedMetrics))

                        } else {

                            // It will be copied by serialization

                            tasksMetrics += ((taskRunner.taskId, metrics))

                        }

                    }

                }

            }

            val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)

            try {

                val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,

                    retryAttempts, retryIntervalMs, timeout)

                if (response.reregisterBlockManager) {

                    logWarning("Told to re-register on heartbeat")

                    env.blockManager.reregister()

                }

            } catch {

                case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)

            }

Thread.sleep(interval)

            }

        }

    }

    t.setDaemon(true)

    t.setName("Driver Heartbeater")

    t.start()

}

这个心跳线程的作用是什么呢?其作用有两个:

更新正在处理的任务的测量信息;

通知BlockManagerMaster,此Executor上的BlockManager依然活着。

下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。

初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接收所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。

private val heartbeatReceiver = env.actorSystem.actorOf(

    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。

override def receiveWithLogging = {

    case Heartbeat(executorId, taskMetrics, blockManagerId) =>

        val response = HeartbeatResponse(

            !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))

        sender ! response

  }

executorHeartbeatReceived的实现代码如下。

val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {

    taskMetrics.flatMap { case (id, metrics) =>

        taskIdToTaskSetId.get(id)

            .flatMap(activeTaskSets.get)

            .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))

    }

}

dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)

这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSet-Manager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到类型为Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dag-Scheduler的executorHeartbeatReceived方法,其实现如下。

listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))

implicit val timeout = Timeout(600 seconds)


Await.result(

    blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),

    timeout.duration).asInstanceOf[Boolean]

dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(参见4.3.1节)。heartbeatReceived最终更新BlockManagerMaster对BlockManger的最后可见时间(即更新Block-ManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。

代码清单3-44 BlockManagerMasterActor的心跳处理

private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {

    if (!blockManagerInfo.contains(blockManagerId)) {

        blockManagerId.isDriver && !isLocal

    } else {

        blockManagerInfo(blockManagerId).updateLastSeenMs()

        true

    }

}

local模式下Executor的心跳通信过程,可以用图3-3来表示。

在非local模式中,Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。

接下来会初始化块管理器BlockManager,代码如下。


图3-3 Executor的心跳通信过程

env.blockManager.initialize(applicationId)

具体的初始化过程,请参阅第4章。

3.9 启动测量系统MetricsSystem

MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:

Instance:指定了谁在使用测量系统;

Source:指定了从哪里收集测量数据;

Sink:指定了往哪里输出测量数据。

Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。

Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。

Spark中使用MetricsServlet作为默认的Sink。

MetricsSystem的启动代码如下。

val metricsSystem = env.metricsSystem

    metricsSystem.start()

MetricsSystem的启动过程包括以下步骤:

1)注册Sources;

2)注册Sinks;

3)给Sinks增加Jetty的ServletContextHandler。

MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attach-Handler将它们绑定到Spark UI上。

metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler (handler)))

3.9.1 注册Sources

registerSources方法用于注册Sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册Sources的过程分为以下步骤:

1)从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。

2)用正则匹配Driver的Properties中以source.开头的属性。然后将属性中的Source反射得到的实例加入ArrayBuffer[Source]。

3)将每个source的metricRegistry(也是MetricSet的子类型)注册到Concurrent-Map<String, Metric> metrics。这里的registerSource方法已在3.8.2节讲解过。

代码清单3-45 MetricsSystem注册Sources的实现

private def registerSources() {

    val instConfig = metricsConfig.getInstance(instance)

    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)


    // Register all the sources related to instance

    sourceConfigs.foreach { kv =>

        val classPath = kv._2.getProperty("class")

        try {

            val source = Class.forName(classPath).newInstance()

            registerSource(source.asInstanceOf[Source])

        } catch {

            case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)

        }

    }

}

3.9.2 注册Sinks

registerSinks方法用于注册Sinks,即告诉测量系统MetricsSystem往哪里输出测量数据,它的实现见代码清单3-46。注册Sinks的步骤如下:

1)从Driver的Properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json},将其转换为Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。

2)将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。

代码清单3-46 MetricsSystem注册Sinks的实现

private def registerSinks() {

    val instConfig = metricsConfig.getInstance(instance)

    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>

        val classPath = kv._2.getProperty("class")

        if (null != classPath) {

            try {

                val sink = Class.forName(classPath)

                .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])

                .newInstance(kv._2, registry, securityMgr)

            if (kv._1 == "servlet") {

                metricsServlet = Some(sink.asInstanceOf[MetricsServlet])

            } else {

                sinks += sink.asInstanceOf[Sink]

            }

            } catch {

                case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)

            }

        }

    }

}

3.9.3 给Sinks增加Jetty的ServletContextHandler

为了能够在SparkUI(网页)访问到测量数据,所以需要给Sinks增加Jetty的Servlet-ContextHandler,这里主要用到MetricsSystem的getServletHandlers方法实现如下。

def getServletHandlers = {

    require(running, "Can only call getServletHandlers on a running MetricsSystem")

    metricsServlet.map(_.getHandlers).getOrElse(Array())

}

可以看到调用了metricsServlet的getHandlers,其实现如下。

def getHandlers = Array[ServletContextHandler](

    createServletHandler(servletPath,

        new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)

)

最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由get-MetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI(creatServlethandler与attachHandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。

http://localhost:4040/metrics/applications/json。

http://localhost:4040/metrics/json。

http://localhost:4040/metrics/master/json。

3.10 创建和启动ExecutorAllocationManager

ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动Executor-AllocationManager的代码如下。

private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =

    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {

        Some(new ExecutorAllocationManager(this, listenerBus, conf))

    } else {

        None

    }

executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-47。

代码清单3-47 ExecutorAllocationManager的关键代码

private val intervalMillis: Long = 100

private var clock: Clock = new RealClock

private val listener = new ExecutorAllocationListener

def start(): Unit = {

    listenerBus.addListener(listener)

    startPolling()

}


private def startPolling(): Unit = {

    val t = new Thread {

        override def run(): Unit = {

            while (true) {

                try {

                    schedule()

                } catch {

                    case e: Exception => logError("Exception in dynamic executor allocation thread!", e)

                }

                Thread.sleep(intervalMillis)

            }

        }

    }

    t.setName("spark-dynamic-executor-allocation")

    t.setDaemon(true)

    t.start()

}

根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

  listenerBus.start()

3.11 ContextCleaner的创建与启动

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner: Option[ContextCleaner] = {

    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {

        Some(new ContextCleaner(this))

    } else {

        None

    }

}

cleaner.foreach(_.start())

ContextCleaner的组成如下:

referenceQueue:缓存顶级的AnyRef引用;

referenceBuffer:缓存AnyRef的虚引用;

listeners:缓存清理工作的监听器数组;

cleaningThread:用于具体清理工作的线程。

ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。

代码清单3-48 keep Cleaning的实现

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

    while (!stopped) {

        try {

            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

                .map(_.asInstanceOf[CleanupTaskWeakReference])

            // Synchronize here to avoid being interrupted on stop()

            synchronized {

                reference.map(_.task).foreach { task =>

                logDebug("Got cleaning task " + task)

                referenceBuffer -= reference.get

                task match {

                    case CleanRDD(rddId) =>

                        doCleanupRDD(rddId, blocking = blockOnCleanupTasks)

                    case CleanShuffle(shuffleId) =>

                        doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

                    case CleanBroadcast(broadcastId) =>

                        doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)

                    }

                }

            }

        } catch {

            case ie: InterruptedException if stopped => // ignore

            case e: Exception => logError("Error in cleaning thread", e)

        }

    }

}

3.12 Spark环境更新

在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

postEnvironmentUpdate()

postApplicationStart()

SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入httpFileServer的fileDir变量指定的路径下。见代码清单3-49。

代码清单3-49 依赖文件处理

val jars: Seq[String] =

    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten


val files: Seq[String] =

    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten


// Add each JAR given through the constructor

    if (jars != null) {

        jars.foreach(addJar)

    }


    if (files != null) {

        files.foreach(addFile)

}

httpFileServer的addFile和addJar方法,见代码清单3-50。

代码清单3-50 HttpFileServer提供对依赖文件的访问

def addFile(file: File) : String = {

    addFileToDir(file, fileDir)

    serverUri + "/files/" + file.getName

}


def addJar(file: File) : String = {

    addFileToDir(file, jarDir)

    serverUri + "/jars/" + file.getName

}


def addFileToDir(file: File, dir: File) : String = {

    if (file.isDirectory) {

        throw new IllegalArgumentException(s"$file cannot be a directory.")

    }

    Files.copy(file, new File(dir, file.getName))

    dir + "/" + file.getName

}

postEnvironmentUpdate的实现见代码清单3-51,其处理步骤如下:

1)通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-52。

2)生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被Environ-mentListener监听,最终影响EnvironmentPage页面中的输出内容。

代码清单3-51 postEnvironmentUpdate的实现

private def postEnvironmentUpdate() {

    if (taskScheduler != null) {

        val schedulingMode = getSchedulingMode.toString

        val addedJarPaths = addedJars.keys.toSeq

        val addedFilePaths = addedFiles.keys.toSeq

        val environmentDetails =

            SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)

        val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)

        listenerBus.post(environmentUpdate)

    }

}

代码清单3-52 environmentDetails的实现

val jvmInformation = Seq(

    ("Java Version", s"$javaVersion ($javaVendor)"),

    ("Java Home", javaHome),

    ("Scala Version", versionString)

).sorted


val schedulerMode =

    if (!conf.contains("spark.scheduler.mode")) {

        Seq(("spark.scheduler.mode", schedulingMode))

    } else {

        Seq[(String, String)]()

    }

val sparkProperties = (conf.getAll ++ schedulerMode).sorted


// System properties that are not java classpaths

val systemProperties = Utils.getSystemProperties.toSeq

val otherProperties = systemProperties.filter { case (k, _) =>

    k != "java.class.path" && !k.startsWith("spark.")

}.sorted


// Class paths including all added jars and files

val classPathEntries = javaClassPath

    .split(File.pathSeparator)

    .filterNot(_.isEmpty)

    .map((_, "System Classpath"))

val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))

val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted


Map[String, Seq[(String, String)]](

    "JVM Information" -> jvmInformation,

    "Spark Properties" -> sparkProperties,

    "System Properties" -> otherProperties,

    "Classpath Entries" -> classPaths)

}

postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。

listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))

3.13 创建DAGSchedulerSource和BlockManagerSource

在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的post-StartHook方法,其目的是为了等待backend就绪,见代码清单3-53。postStartHook的实现见代码清单3-54。

创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage. runningStages、stage. waiting-Stages、stage. allJobs、stage. activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpace-Used_MB。

代码清单3-53 创建DAGSchedulerSource和BlockManagerSource

    taskScheduler.postStartHook()


    private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)

    private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)


private def initDriverMetrics() {

    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)

    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)

}


initDriverMetrics()

代码清单3-54 postStartHook的实现

override def postStartHook() {

        waitBackendReady()

    }


private def waitBackendReady(): Unit = {

    if (backend.isReady) {

        return

    }

    while (!backend.isReady) {

        synchronized {

            this.wait(100)

        }

    }

}

3.14 将SparkContext标记为激活

SparkContext初始化的最后将当前SparkContext的状态从contextBeingConstructed(正在构建中)改为activeContext(已激活),代码如下。

SparkContext.setActiveContext(this, allowMultipleContexts)

setActiveContext方法的实现如下。

private[spark] def setActiveContext(

        sc: SparkContext,

        allowMultipleContexts: Boolean): Unit = {

    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {

        assertNoOtherContextIsRunning(sc, allowMultipleContexts)

        contextBeingConstructed = None

        activeContext = Some(sc)

    }

}

3.15 小结

回顾本章, Scala与Akka的基于Actor的并发编程模型给人的印象深刻。listenerBus对于监听器模式的经典应用看来并不复杂,希望读者朋友能应用到自己的产品开发中去。此外,使用Netty所提供的异步网络框架构建的Block传输服务,基于Jetty构建的内嵌web服务(HTTP文件服务器和SparkUI),基于codahale提供的第三方测量仓库创建的测量系统,Executor中的心跳实现等内容,都值得借鉴。


相关文章
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2041 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
941 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1292 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1627 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
899 0
|
分布式计算 Spark Hadoop
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六)
- 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
1185 0
|
分布式计算 Apache Spark
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
理解FinalStage是如何按stage从前到后依次提交顺序
2234 0
|
缓存 分布式计算 Scala
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
理解FinalStage的转化(即Stage的划分)
900 0
|
分布式计算 Spark
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
理解DAG事件循环处理器处理事件流程
1049 0
|
分布式计算
Spark2.4.0源码分析之WorldCount 触发作业提交(二)
Final RDD作为参数,通过RDD.collect()函数触发作业提交
1379 0