《Spark大数据分析实战》——3.2节Spark Streaming

简介:

本节书摘来自华章社区《Spark大数据分析实战》一书中的第3章,第3.2节Spark Streaming,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.2 Spark Streaming
Spark Streaming是一个批处理的流式计算框架。它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性。下面将对Spark Streaming进行详细的介绍。
3.2.1 Spark Streaming简介
Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力。Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就是RDD)都会生成一个Spark Job进行处理,最终以批处理的方式处理每个时间片的数据。请参照图3-6。


6a55160c3c99cfc263389c24a1bb9d8f06a96fda

Spark Streaming编程接口和Spark很相似。在Spark中,通过在RDD上用Transformation(例如:map, f?ilter等)和Action(例如:count, collect等)算子进行运算。在Spark Streaming中通过在DStream(表示数据流的RDD序列)上进行算子运算。图3-7为Spark Streaming转
化过程。


5ad3fdd38d6e3e4cce68fe00e38b4ca809cfea8c

图3-7中Spark Streaming将程序中对DStream的操作转换为DStream有向无环图(DAG)。对每个时间片,DStream DAG会产生一个RDD DAG。在RDD中通过Action算子触发一个Job,然后Spark Streaming会将Job提交给JobManager。JobManager会将Job插入维护的Job队列,然后JobManager会将队列中的Job逐个提交给Spark DAGScheduler,然后Spark会调度Job并将Task分发到各节点的Executor上执行。
(1)优势及特点
1)多范式数据分析管道:能和Spark生态系统其他组件融合,实现交互查询和机器学习等多范式组合处理。
2)扩展性:可以运行在100个节点以上的集群,延迟可以控制在秒级。
3)容错性:使用Spark的Lineage及内存维护两份数据进行备份达到容错。RDD通过Lineage记录下之前的操作,如果某节点在运行时出现故障,则可以通过冗余备份数据在其他节点重新计算得到。


f34fd1b6c5ec0aa2d1e3717baab0357054d903a4

5)实时性:Spark Streaming也是一个实时计算框架,Spark Streaming能够满足除对实时性要求非常高(例如:高频实时交易)之外的所有流式准实时计算场景。目前Spark Streaming最小的Batch Size的选取在0.5~2s(对比:Storm目前最小的延迟是100ms左右)。
(2)适用场景
Spark Streaming适合需要历史数据和实时数据结合进行分析的应用场景,对于实时性要求不是特别高的场景也能够胜任。
3.2.2 Spark Streaming架构
通过图3-10,读者可以对Spark Streaming的整体架构有宏观把握。


18cb9e4550d1d36a8168bf9ed0c99a10dabb731c

组件介绍:
Network InputTracker:通过接收器接收流数据,并将流数据映射为输入DStream。
Job Scheduler:周期性地查询DStream图,通过输入的流数据生成Spark Job,将Spark Job提交给Job Manager进行执行。
JobManager:维护一个Job队列,将队列中的Job提交到Spark进行执行。
通过图3-10可以看到D-Stream Lineage Graph进行整体的流数据的DAG图调度,Taskscheduler负责具体的任务分发,Block tracker进行块管理。在从节点,如果是通过网络输入的流数据会将数据存储两份进行容错。Input receiver源源不断地接收输入流,Task execution负责执行主节点分发的任务,Block manager负责块管理。Spark Streaming整体架构和Spark很相近,很多思想是可以迁移理解的。
3.2.3 Spark Streaming原理剖析
下面将由一个example示例,通过源码呈现Spark Streaming的底层机制。
1.?初始化与接收数据
Spark Streaming通过分布在各个节点上的接收器,缓存接收到的流数据,并将数据包装成Spark能够处理的RDD的格式,输入到Spark Streaming,之后由Spark Streaming将作业提交到Spark集群进行执行,如图3-11所示。


db6b3980c442cb1573cd4d3a83c7d866efc6d0f2

初始化的过程主要可以概括为两点。
1)调度器的初始化。
调度器调度Spark Streaming的运行,用户可以通过配置相关参数进行调优。
2)将输入流的接收器转化为RDD在集群进行分布式分配,然后启动接收器集合中的每个接收器。
针对不同的数据源,Spark Streaming提供了不同的数据接收器,分布在各个节点上的每个接收器可以认为是一个特定的进程,接收一部分流数据作为输入。
用户也可以针对自身生产环境状况,自定义开发相应的数据接收器。
如图3-12所示,接收器分布在各个节点上。通过下面代码,创建并行的、在不同Worker节点分布的receiver集合。

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, 
Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
// 在这里创造RDD相当于进入SparkContext.makeRDD
// 此处将receivers的集合作为一个RDD进行分区RDD[Receiver]
// 即使是只有一个输入流,按照这个分布式也是流的输入端在worker而不再Master 
…
// 将receivers的集合打散,然后启动它们
…
ssc.sparkContext.runJob(tempRDD, startReceiver)
…
    }


a4f4a0ae32c2a6e587cdba2668e7caea16eb9e09

2.?数据接收与转化
在“初始化与接收数据”部分中已经介绍过,receiver集合转换为RDD,在集群上分布式地接收数据流。那么每个receiver是怎样接收并处理数据流的呢?读者可以通过图3-13,对输入流的处理有一个全面的了解。图3-13为Spark Streaming数据接收与转化的示意图。
图3-13的主要流程如下。
1)数据缓冲:在receiver的receive函数中接收流数据,将接收到的数据源源不断地放入到BlockGenerator.currentBuffer。
2)缓冲数据转化为数据块:在BlockGenerator中有一个定时器(RecurringTimer),将当前缓冲区中的数据以用户定义的时间间隔封装为一个数据块Block,放入到BlockGenerator的blocksForPush队列中(这个队列)。
3)数据块转化为Spark数据块:在BlockGenerator中有一个BlockPushingThread线程,不断地将blocksForPush队列中的块传递给BlockManager,让BlockManager将数据存储为块。BlockManager负责Spark中的块管理。
4)元数据存储:在pushArrayBuffer方法中还会将已经由BlockManager存储的元数据信息(例如:Block的id号)传递给ReceiverTracker,ReceiverTracker会将存储的blockId放到对应StreamId的队列中。


56b7343bc7c80334b5fb15006c00c7697d52c822

图中部分组件的作用如下:

KeepPushingBlocks:调用此方法持续写入和保持数据块。
pushArrayBuffer:调用pushArrayBuffer方法将数据块存储到BlockManager中。
reportPushedBlock:存储完成后汇报数据块信息到主节点。
receivedBlockInfo(Meta Data):已经接收到的数据块元数据记录。
streamId:数据流Id。
BlockInfo:数据块元数据信息。
BlockManager.put:数据块存储器写入备份数据块到其他节点。
Receiver:数据块接收器,接收数据块。
BlockGenerator:数据块生成器,将数据缓存生成Spark能处理的数据块。
BlockGenerator.currentBuffer:缓存网络接收的数据记录,等待之后转换为Spark的数据块。
BlockGenerator.blocksForPushing:将一块连续数据记录暂存为数据块,待后续转换为Spark能够处理的BlockManager中的数据块(A Block As a BlockManager’s Block)。
BlockGenerator.blockPushingThread:守护线程负责将数据块转换为BlockManager中数据块。
ReceiveTracker:输入数据块的元数据管理器,负责管理和记录数据块。
BlockManager:Spark数据块管理器,负责数据块在内存或磁盘的管理。
RecurringTimer:时间触发器,每隔一定时间进行缓存数据的转换。
上面的过程中涉及最多的类就是BlockGenerator,在数据转化的过程中其扮演者不可或缺的角色。
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf
  ) extends Logging 

感兴趣的读者可以参照图中所示的类和方法进行更加具体的机制的了解。篇幅所限,对这个数据生成过程不再做具体的代码剖析。
3.?生成RDD与提交Spark Job
Spark Streaming根据时间段,将数据切分为RDD,然后触发RDD的Action提交Job,Job被提交到Job Manager中的Job Queue中由Job Scheduler调度,之后Job Scheduler将Job提交到Spark的Job调度器,然后将Job转换为大量的任务分发给Spark集群执行,如图3-14所示。


eff424d7864e443e868c04ef024584013a3de04e

Job generator中通过下面的方法生成Job进行调度和执行。
从下面的代码可以看出job是从outputStream中生成的,然后再触发反向回溯执行整个DStream DAG,类似RDD的机制。

private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
// 获取输入数据块的元数据信息
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
         . . .
        }.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
eventActor !DoCheckpoint(time)
  }
// 下面进入JobScheduler的submitJobSet方法一探究竟,JobScheduler是整个Spark  
   Streaming调度的核心组件
def submitJobSet(jobSet: JobSet) {
    . . .
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
 . . .
  }

// 进入Graph生成job的方法,Graph本质是DStreamGraph类生成的对象
final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
  . . .
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
   . . .
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
   . . .
  }

// outputStreams中的对象是DStream,下面进入DStream的generateJob一探究竟

private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
// 此处相当于针对每个时间段生成的一个RDD,会调用SparkContext的方法runJob提交Spark的一个Job
context.sparkContext.runJob(rdd, emptyFunc)
        }
Some(new Job(time, jobFunc))
      }
case None => None
    }
  }

// 在DStream算是父类,一些具体的DStream例如SocketInputStream等的类的父类可以通过
  SocketInputDStream看是如何通过上面的getOrCompute生成RDD的
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {

generatedRDDs.get(time) match {
      . . .
case None => {
if (isTimeValid(time)) {

// Dstream是个父类,这里代表的是子类的compute方法,DStream通过compute调用用户自定义函数。当任务执行时,同一个stage中的DStream函数会串联依次执行
compute(time) match {
            . . .
generatedRDDs.put(time, newRDD)
           . . .
  }
在SocketInputDStream的compute方法中生成了对应时间片的RDD:
override def compute(validTime: Time): Option[RDD[T]] = {
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
  }

Spark Streaming在保证实时处理的要求下还能够保证高吞吐与容错性。用户的数据分析中很多情况下也存在需要分析图数据,运行图算法,通过GraphX可以简便地开发分布式图分析算法。

相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
156 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
77 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
104 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
130 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
93 1
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
75 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
70 1
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
130 0