一、Spark Streaming功能介绍
(1)概述
Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理.Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。
关于spark streaming的更多了解可以见官方文档
http://spark.apache.org/docs/2.4.6/streaming-programming-guide.html#checkpointing
Spark Streaming处理的数据流图:
在内部,它的工作方式如下。Spark Streaming接收实时输入数据流,并将数据分成批处理,然后由Spark引擎进行处理,以生成批处理的最终结果流。
(2)DStream概述
和 Spark 基于 RDD 的概念很相似, Spark Streaming 使用 离散化流 (discretized stream)作 为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。 在内部, 每个 时间区间收到的数据都作为 RDD 存在, 而 DStream 是由这些 RDD 所组成的序列(因此 得名“离散化”)。 DStream 可以从各种输入源创建, 比如 Flume、Kafka 或者 HDFS。也可以通过对其他DStream应用高级操作来创建DStream。在内部,DStream表示为RDD序列 。 创 建出来的 DStream 支持两种操作, 一种是 转化操作 (transformation), 会生成一个新的 DStream, 另一种是 输出操作 (output operation) , 可以把数据写入外部系统中。 DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比 如滑动窗口。
批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。
时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。
窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数
滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数
Input DStream:一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。
对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一 个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream 中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的 一个RDD。底层的RDD的transformation操作,其实,还是由Spark Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API。
(3)Storm和Spark Streaming比较
处理模型以及延迟
虽然两框架都提供了可扩展性(scalability)和可容错性(fault tolerance),但是它们的处理模型从根本上说是不一样的。Storm可以实现亚秒级时延的处理,而每次只处理一条event,而Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event。所以说Storm可以实现亚秒级时延的处理,而Spark Streaming则有一定的时延。
容错和数据保证
然而两者的代价都是容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。这就意味着可变更的状态可能被更新两次从而导致结果不正确。
任一方面,Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次,即使是node节点挂掉。虽然说Storm的 Trident library可以保证一条记录被处理一次,但是它依赖于事务更新状态,而这个过程是很慢的,并且需要由用户去实现。
实现和编程API
Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。如果你想看看这两个框架是如何实现的或者你想自定义一些东西你就得记住这一点。Storm是由BackType和Twitter开发,而Spark Streaming是在UC Berkeley开发的。
Storm提供了Java API,同时也支持其他语言的API。 Spark Streaming支持Scala和Java语言(其实也支持Python)。
批处理框架集成
Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就可以想使用其他批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询。这就减少了单独编写流批量处理程序和历史数据处理程序。
生产支持
Storm已经出现好多年了,而且自从2011年开始就在Twitter内部生产环境中使用,还有其他一些公司。而Spark Streaming是一个新的项目,并且在2013年仅仅被Sharethrough使用(据作者了解)。
Storm是 Hortonworks Hadoop数据平台中流处理的解决方案,而Spark Streaming出现在 MapR的分布式平台和Cloudera的企业数据平台中。除此之外,Databricks是为Spark提供技术支持的公司,包括了Spark Streaming。
虽然说两者都可以在各自的集群框架中运行,但是Storm可以在Mesos上运行, 而Spark Streaming可以在YARN和Mesos上运行。
二、Spark Streaming服务架构及工作原理
Spark Streaming 为每个输入源启动对 应的 接收器 。 接收器以任务的形式运行在应用的执行器进程中, 从输入源收集数据也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset)。 它们收集到输入数据后会把数据复制到另一个执行器进程来保障容错性(默 认行为)。数据保存在执行器进程的内存中,和缓存 RDD 的方式一样 。驱动器程序中的 StreamingContext 会周期性地运行 Spark 作业来处理这些数据,把数据与之前时间区间中的 RDD 进行整合。下图显示了Spark Streaming的整个流程。
Spark Streaming 在 Spark 各组件中的执行过程:
Spark Streaming 对 DStream 提供的容错性与 Spark 为 RDD 所提供的容错性一致:只要输 入数据还在,它就可以使用 RDD 谱系重算出任意状态
三、StreamingContext原理详解
StreamingContext初始化的两种方式
第一种:
val ssc = new StreamingContext(sc, Seconds(1))
第二种:
val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
通常我们选择用第一种方法。
appName,是用来在Spark UI上显示的应用名称。master,是一个Spark、Mesos或者Yarn集群的URL,或者 是local[*]。
一个StreamingContext定义之后,必须做以下几件事情:
通过创建输入DStream来创建输入数据源。
通过对DStream定义transformation和output算子操作,来定义实时计算逻辑。
调用StreamingContext的start()方法,来开始实时处理数据。
调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停 止,或者就是让它持续不断的运行进行计算。
也可以通过调用StreamingContext的stop()方法,来停止应用程序。
需要注意的要点:
只要一个StreamingContext启动之后,就不能再往其中添加任何计算逻辑了。比如执行start()方 法之后,还给某个DStream执行一个算子。
一个StreamingContext停止之后,是肯定不能够重启的。调用stop()之后,不能再调用start()
一个JVM同时只能有一个StreamingContext启动。在你的应用程序中,不能创建两个 StreamingContext。
一个SparkContext可以创建多个StreamingContext,只要上一个先用stop(false)停止,再创建下 一个即可。
四、DStream和Receiver详解
输入DStream代表了来自数据源的输入数据流。在之前的wordcount例子中,lines就是一个输入 DStream(JavaReceiverInputDStream),代表了从netcat(nc)服务接收到的数据流。除了文件数 据流之外,所有的输入DStream都会绑定一个Receiver对象,该对象是一个关键的组件,用来从数据 源接收数据,并将其存储在Spark的内存中,以供后续处理。
Spark Streaming提供了两种内置的数据源支持;
1、基础数据源:StreamingContext API中直接提供了对这些数据源的支持,比如文件、socket、 Akka Actor等。
2、高级数据源:诸如Kafka、Flume、Kinesis、Twitter等数据源,通过第三方工具类提供支持。这
些数据源的使用,需要引用其依赖。
3、自定义数据源:我们可以自己定义数据源,来决定如何接受和存储数据。
要注意的是,如果你想要在实时计算应用中并行接收多条数据流,可以创建多个输入DStream。这样 就会创建多个Receiver,从而并行地接收多个数据流。但是要注意的是,一个Spark Streaming Application的Executor,是一个长时间运行的任务,因此,它会独占分配给Spark Streaming Application的cpu core。从而只要Spark Streaming运行起来以后,这个节点上的cpu core,就没法 给其他应用使用了。
使用本地模式,运行程序时,绝对不能用local或者local[1],因为那样的话,只会给执行输入 DStream的executor分配一个线程。而Spark Streaming底层的原理是,至少要有两条线程,一条线程 用来分配给Receiver接收数据,一条线程用来处理接收到的数据。因此必须使用local[n],n>=2的模 式。
如果不设置Master,也就是直接将Spark Streaming应用提交到集群上运行,那么首先,必须要求集 群节点上,有>1个cpu core,其次,给Spark Streaming的每个executor分配的core,必须>1,这样, 才能保证分配到executor上运行的输入DStream,两条线程并行,一条运行Receiver,接收数据;一 条处理数据。否则的话,只会接收数据,不会处理数据。