一、解密SparkStreaming另类在线实验
二、瞬间理解SparkStreaming本质
Spark源码定制,自己动手改进Spark源码,通常在电信、金融、教育、医疗、互联网等领域都有自己不同的业务,如果Sprak官方版本没有你需要的业务功能,你自己可以定制、扩展Spark的功能,满足公司的业务需要。
选择SparkStreaming框架源码研究、二次开发的原因
1、Spark起初只有Spark Core基础框架没有其他的子框架(Spark SQL、Spark Streaming、Spark ML、Spark Graphx、Spark R等),在其后加入了各种子框架来满足不同的需求。而分析这些子框架发现,选择Spark Streaming框架来研究,是最明智的选择,通过研究Spark Core上的Spark Streaming子框架,是迈向精通Spark力量源泉和解决所有问题之道。
2、Spark有很多子框架,我们选择Spark Streaming而为什么不用其他框架?
Spark SQL涉及了很多SQL语法细节的解析和优化,当然分析其解析、优化从而集中精力去研究Spark而言是一件重要的事情,但不是最重要的事情,所以Spark SQL不太适合作为具体的子框架值得我们去研究。
目前Spark R现在不成熟,支撑功能有限。
图计算,从各版本演进而言Graphx几乎没有改进,这种趋势,Graphx是不是已经发展基本到尽头了;另外图计算而言有很多数学级别的算法,而要把Spark做到极致,数学对我们来说重要,但对于研究而言不是最重要的。
Mechine Learning在封装了Vector向量、Metrics构建了众多的算法库,从而涉及了太多的数学知识,所有选择ML其实也不是太好的选择。
最后筛选出SparkStreaming子框架才是最佳的研究切入黄金点。
Spark Streaming是流式计算框架,一切数据如果和流处理不相关的话都是无效的数据。流式处理才是我们真正对大数据的初步印象,数据流进来它立即会给我们一个反馈,而不是数据挖掘、图计算。Spark强悍地方是使用流处理可以完美的结合使用机器学习的成果、图计算的成果、Spark sql或者说spark R的成果。这得益于Spark的一体化、多元化的技术架构的设计,也就是说通过一个技术堆栈可以调用技术堆栈中所有的东西,根本不需要任何的设置,这是Spark无可匹敌之处也是SparkStreaming一统天下的根源。这个时代的流处理单打独斗是不行了,SparkStreaming和多个兄弟框架联合起来,无可匹敌。如果你精通SparkStreaming的话,恭喜你,因为SparkStreaming以及背后的几个兄弟框架正好展示了Spark大数据无穷的魅力。
整个Spark的所有应用程序,哪些程序容易出问题?肯定是SparkStreaming程序是最容易出问题的,因为数据是不断流入的,ss要动态的控制数据的流入、作业的切分、数据的处理,所以最容易出问题,但最容易出问题的地方同时也是最受关注的地方也是展示大数据最神奇魅力的地方。这些特色结合的话,也是最需要技术人才的地方。关注SparkStreaming在Spark的发展,你会很清晰知道,ss跟其他自框架不同之处,ss很象SparkCore上的一个应用程序。
正如世界万物发展一样,任何技术都有其关键点或转折点,SparkStreaming运行在SparkCore上,所以很多性能调优都是建立在SparkCore上的;Spark是大数据的龙脉,SparkStreaming是龙脉的穴位。
接下来感受一下龙脉和穴位
研究SparkStreaming时,有困惑你的东西,SparkStreaming数据不断流进来,根据batchInterval时间片不断生成Job,并将Job提交集群处理,如果能清晰的看到数据的流入和数据的处理,你心里会很很踏实。
如何能清晰的看到数据的处理过程呢?只需要一个小技巧:就是把SparkStreaming中的batchInterval放的足够大,例如说从30秒调整为1分钟一次batch,或者5分钟一次batch,你会很清晰的看到整个流程序的运行过程。
以广告点击在线黑名单的过滤为例
调整时间维度:
我们把时间从30秒调至300秒:
读取SparkStreaming Socket端口的数据:
打包程序发布至集群部署环境:
检查集群进程:
通过webui检查HDFS启动情况:
启动history-server监控进程及其对应的webui监控界面:
至此整个集群环境启动一切OK。
开始启动运行SparkStreaming应用程序
启动外部请求SparkStreaming服务端口的客户端:
输入待处理的数据流:
看结果如下:
看webui控制台:
点击链接进入后产生了0~4个Job:
有意思的是SparkStreaming应用程序启动实际执行的是一个Job,但真正执行的是5个Job,其分别是Receiver Job,Output Job,Output Job,Output Job,Start Job.
第 0 个Job是不是我们逻辑中的代码?不是的,不是reduceByKey的执行结果Job,如下图:
SparkStreaming在启动的过程中会自动启动一些Job,如start操作:
SparkStreaming最像一个应用程序,就算是算一次,也执行了好几个Job,就像spark应用程序一样,可以启动不同的Job完成不同的功能。
继续看Job1:
通过Job告诉你内幕:通过追踪Receiver发现其会产生makeRDD,实际上作为整个Job独立的一个stage,只在一台机器上执行,而且执行了1.5分钟,刚才启动SparkStreaming,没有任务执行1.5分钟的,如下图:
思考一下什么东西执行了1.5分钟,而整个Job只运行了2分钟?
答案就是ReceiverTracker接收器运行的,它需要接收流入的数据。这个Job就是Receiver,并且执行了1.5分钟,而启动的Receiver就是一个Job。
结论:
SparkStreaming启动Receiver的是一个Job,在具体的Cluster的Worker上的executor中,启动Receiver是通过Job启动的。通过作业的运行时间看出,整个SparkStreaming运行的时间是2分钟,其中有个Job运行了1.5分钟,这个Job就是Receiver,其实指的是Receiver启动运行的时间,Receiver是在executor中运行的,也就是说SparkStreaming框架在启动Recevier是通过Job启动的。而且Receiver(可以启动多个receiver接收数据)就是在一个executor中运行且通过一个Task去接收我们的数据:
从这个角度讲Receiver接收数据和普通job有什么区别?没有区别。转过来给我们启发:在一个Spark application中可以启动很多的job,这些job之间可以相互配合。例如:SparkStreaming框架默认启动job给你接收数据,然后为后续的处理做准备,为你写复杂的应用程序奠定了一个良好的基础。这就是你写非常复杂的Spark应用程序的黄金切入点,复杂的程序一般都是有多个job构成的。
上图的Process_local即内存节点,SparkStreaming在默认情况下接收数据是memory_and_disk_ser_2的方式,也就是说接收的数据量比较少内存能存下的话默认情况下是不会存储磁盘的,在这里直接使用内存中。
看下第0个job:
在4个worker上启动4个executor,是在最大化的使用计算资源,通过第1个job 不断接收数据。
这里处理数据有shuffle read,shuffle write,通过socketTextStream即rdd,这里叫blockRdd,而且blockrdd来自于socketTextStream的方法:
其实是inputStream帮我们在固定时间间隔内会产生固定的rdd,接收数据是在一个executor的task中接收的,但现在处理数据是transform操作发生在executor里面的发生在4个executor,这个结果告诉我们在一台机器上接收数据,但实际上是在四台机器上处理数据的。最大化利用集群资源处理数据。SparkStreaming程序执行时就是一个batch级别的Job,里面做了很多事情。整个处理,其实只有一个Job真正在执行,但产生很多Job相互协调来完成复杂的业务处理,这个情况告诉我们SparkStreaming并不是网络、博客、书籍、官网上讲的那么简单。
SparkStreaming本身是随着流进来的数据按照时间为单位生成job,然后触发job在Cluster上执行的流式处理的引擎,它本身是加上以时间为维度的批处理,实例中以300秒为会产生一批数据,基于这一批数据会生成rdd,基于rdd会触发job,rdd的生成、job的触发,都是SparkStreaming框架去做的。SparkStreaming中有个至关只要的东西叫DStream,我们每隔一定时间都会生成rdd,产生rdd的依赖或触发job具体的执行。每隔时间,所以弄了一个DStream,DStream代表时空的概念,时间为维度,随着时间的推进不断产生rdd,实际上DStream就是rdd的集合,只不过是有时间的先后顺序;空间维度实际上是DStream的处理层面,我们对DStream进行处理实际上是对DStream里面的每个rdd的处理。整个时空是一个很大的概念,时间固定的话,可以锁定对空间的操作,操作其实就是transform,对DStream的操作会构建DStream Graph。
总结:
随着时间为维度有个DStream Graph,同时在时间维度下有个空间维度,空间维度就是操作,空间维度确定的情况下时间不断推进的时候他就不断把空间维度的DStream Graph实例化成rdd的graph,然后触发具体的job进行执行。
一、解密SparkStreaming运行机制
二、解密SparkStreaming架构
SparkStreaming运行时更像SparkCore上的应用程序,SparkStreaming程序启动后会启动很多job,每个batchIntval、windowByKey的job、框架运行启动的job。例如,Receiver启动时也启动了job,此job为其他job服务,所以需要做复杂的Spark程序,往往多个job之间互相配合。SparkStreaming是最复杂的应用程序,如果对SparkStreaming了如指掌的话,做其他的Spark应用程序没有任何问题。看下官网:Spark sql,SparkStreaming,Spark ml,Spark graphx子框架都是后面开发出来的,我们要洞悉Spark Core 的话,SparkStreaming是最好的切入方式。
进入Spark官网,可以看到SparkCore和其他子框架的关系:
SparkStreaming启动后,数据不断通过inputStream流进来,根据时间划分成不同的job、就是batchs of input data,每个job有一序列rdd的依赖。Rdd的依赖有输入的数据,所以这里就是不同的rdd依赖构成的batch,这些batch是不同的job,根据spark引擎来得出一个个结果。DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,转过来是对其内部的RDD操作。
我是使用SparkCore 编程都是基于rdd编程,rdd间有依赖关系,如下图右侧的依赖关系图,SparkStreaming运行时,根据时间为维度不断的运行。Rdd的dag依赖是空间维度,而DStream在rdd的基础上加上了时间维度,所以构成了SparkStreaming的时空维度。
SparkStreaming在rdd的基础上增加了时间维度,运行时可以清晰看到jobscheduler、mappartitionrdd、shuffledrdd、blockmaanager等等,这些都是SparkCore的内容,而DStream、jobgenerator、socketInputDstream等等都是SparkStreaming的内容,如下图运行过程可以很清晰的看到:
现在通过SparkStreaming的时空维度来细致说明SparkStreaming运行机制
时间维度:按照固定时间间隔不断地产生job对象,并在集群上运行:
包含有batch interval,窗口长度,窗口滑动时间等
空间维度:代表的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的:
1、需要RDD,DAG的生成模板
2、TimeLine的job控制器、
3、InputStream和outputstream代表的数据输入输出
4、具体Job运行在Spark Cluster之上,此时系统容错就至关重要
5、事务处理,在处理出现奔溃的情况下保证Exactly once的事务语义一致性
随着时间的流动,基于DStream Graph不断生成RDD Graph,也就是DAG的方式生成job,并通过Job Scheduler的线程池的方式提交给Spark Cluster不断的执行,
由上图可知,RDD 与 DStream之间的关系如下:
1、RDD是物理级别的,而 DStream 是逻辑级别的;
2、DStream是RDD的封装模板类,是RDD进一步的抽象;
3、DStream要依赖RDD进行具体的数据计算;
Spark Streaming源码解析
1、StreamingContext方法中调用JobScheduler的start方法:
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master", 9999)
......//业务处理代码略
ssc.start()
ssc.awaitTermination()
我们进入JobScheduler start方法的内部继续分析:
1、JobScheduler 通过onReceive方法接收各种消息并存入enventLoop消息循环体中。
2、通过rateController对流入SparkStreaming的数据进行限流控制。
3、在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法。
ReceiverTacker的启动方法:
1、ReceiverTracker启动后会创建ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息。
2、ReceiverTracker启动后会在Spark Cluster中启动executor中的Receivers。
JobGenerator的启动方法:
1、JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器
a. Spark Streaming Job 架构和运行机制
b. Spark Streaming Job 容错架构和运行机制
注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。
上节回顾:
上节课谈到Spark Streaming是基于DStream编程。DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,归根结底还是对其RDD进行的操作。
如果将Spark Streaming放在坐标系中,并以Y轴表示对RDD的操作,RDD的依赖关系构成了整个job的逻辑应用,以X轴作为时间。随着时间的流逝,以固定的时间间隔(Batch Interval)产生一个个job实例,进而在集群中运行。
同时也为大家详细总结并揭秘 Spark Streaming五大核心特征:特征1:逻辑管理、特征2:时间管理、特征3:流式输入和输出、特征4:高容错、特征5:事务处理。最后结合Spark Streaming源码做了进一步解析。
**
开讲
**
由上一讲可以得知,以固定的时间间隔(Batch Interval)产生一个个job实例。那么在时间维度和空间维度组成的时空维度的Spark Streaming中,Job的架构和运行机制、及其容错架构和运行机制是怎样的呢?
那我们从爱因斯坦的相对时空讲起吧:
a、时间和空间是紧密联系的统一体,也称为时空连续体。
b、时空是相对的,不同的观察者看到的时间,长度,质量都可以不一样。
c、对于两个没有联系的事件,没有绝对的先后顺序。但是因果关系可以确定事件的先后,比如Job的实例产生并运行在集群中,那么Job实例的产生事件必然发生在Job运行集群中之前。
就是说Job的实例产生和单向流动的时间之间,没有必然的联系;在这里时间只是一种假象。
怎么更好的理解这句话呢?那我们就得从以下方面为大家逐步解答。
什么是Spark Streaming Job 架构和运行机制 ?
对于一般的Spark应用程序来说,是RDD的action操作触发了Job的运行。那对于SparkStreaming来说,Job是怎么样运行的呢?我们在编写SparkStreaming程序的时候,设置了BatchDuration,Job每隔BatchDuration时间会自动触发,这个功能是Spark Streaming框架提供了一个定时器,时间一到就将编写的程序提交给Spark,并以Spark job的方式运行。
通过案例透视Job架构和运行机制
案例代码如下:
将上述代码打成JAR包,再上传到集群中运行
集群中运行结果如下
运行过程总图如下
案例详情解析
a、 首先通过StreamingContext调用start方法,其内部再启动JobScheduler的Start方法,进行消息循环;
(StreamingContext.scala,610行代码)
(JobScheduler.scala,83行代码)
b、 在JobScheduler的start内部会构造JobGenerator和ReceiverTacker;
(JobScheduler.scala,82、83行代码)
c、 然后调用JobGenerator和ReceiverTacker的start方法执行以下操作:
(JobScheduler.scala,79、98行代码)
(ReceiverTacker.scala,149、157行代码)
- JobGenerator启动后会不断的根据batchDuration生成一个个的Job ;
(JobScheduler.scala,208行代码)
- ReceiverTracker的作用主要是两点:
1.对Receiver的运行进行管理,ReceiverTracker启动时会调用lanuchReceivers()方法,进而会使用rpc通信启动Receiver(实际代码中,Receiver外面还有一层包装ReceiverSupervisor实现高可用)
(ReceiverTracker.scala,423行代码)
2.管理Receiver的元数据,供Job对数据进行索引,元数据的核心结构是receivedBlockTracker
(ReceiverTracker.scala,106~112行代码)
d、 在Receiver收到数据后会通过ReceiverSupervisor存储到Executor的BlockManager中 ;
e、 同时把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息;
这里面涉及到两个Job的概念:
每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行)
为什么使用线程池呢?
a 、作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
b 、有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;
Spark Streaming Job 容错架构和运行机制
Spark Streaming是基于DStream的容错机制,DStream是随着时间流逝不断的产生RDD,也就是说DStream是在固定的时间上操作RDD,容错会划分到每一次所形成的RDD。
Spark Streaming的容错包括 Executor 与 Driver两方面的容错机制 :
a、 Executor 容错:
1. 数据接收:分布式方式、wal方式,先写日志再保存数据到Executor
2. 任务执行安全性 Job基于RDD容错 :
b、Driver容错 : checkpoint 。
基于RDD的特性,它的容错机制主要就是两种:
1. 基于checkpoint;
在stage之间,是宽依赖,产生了shuffle操作,lineage链条过于复杂和冗长,这时候就需要做checkpoint。
2. 基于lineage(血统)的容错:
一般而言,spark选择血统容错,因为对于大规模的数据集,做检查点的成本很高。
考虑到RDD的依赖关系,每个stage内部都是窄依赖,此时一般基于lineage容错,方便高效。
总结: stage内部做lineage,stage之间做checkpoint。