如何调优Spark Steraming

简介: 如何调优Spark Steraming

云计算和大数据密不可分,这里有必要详细讨论下我的老本行——大数据领域。未来几年,我们将很荣幸地见证大数据技术的容器化。首先我们用几篇文章深入地了解一下大数据领域的相关技术。

1. 背景和简介

Spark Streaming是Spark的一个组件,它把流处理当作离散微批处理,被称为离散流或DStream。Spark的核心是RDD,即弹性分布式数据集。RDD本质上是将数据分区(Partition)封装起来。而DStream是一个由时间驱动、逻辑封装的RDD。以下面这段代码为例:

def main(args: Array[String]): Unit = {
    val logger = LoggerFactory.getLogger(PreJoin.getClass)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[2]").setAppName("PreJoin")
    //    val conf = new SparkConf().setAppName("PreJoin")
    /** 限制每秒钟从topic的每个partition最多消费的消息条数 */
    conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
    conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
    // 设置每五秒更新一下
    val ssc = new StreamingContext(conf, Seconds(5))
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    import sparkSession.implicits._
    var zookeeperservers = ""
     //  hbase的相关配置
    val config: Configuration = HBaseConfiguration.create()
    val newAPIJobConfiguration = Job.getInstance(config)
   // 读取Kafka
    val topics = Array("result1")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    // 读取另外两个topic
    // 每30秒一个滑动窗口处理30秒内到达的数据,将Json数据解析出来
    val resultJson = stream.map(record => record.value).window(Seconds(30), Seconds(30)).transform(rdd => {
      ...
    })
    // resultJson2和resultJson3类似
    val resultJson2 =
    val resultJson3 = 
    // 合并流
    val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))
    // 遍历每个RDD,存入HBase
    resultJsonAll.foreachRDD(rdd => {
      val hbasePuts3 = data3.rdd.map((row: Row) => {
        ...
      })
      hbasePuts3.saveAsNewAPIHadoopDataset(newAPIJobConfiguration3.getConfiguration)
    })
    ssc.start() // 开始计算
    ssc.awaitTermination()
  } // 等待停止计算

上面这段代码描述了一个典型的Spark Streaming 的处理流程。它的功能是从Kafka拉取数据,经过一系列的转换,将结果存入HBase。我们可以看到流处理应用程序和批处理应用程序的一些区别。批处理应用程序拥有清晰的生命周期,它们一旦处理了输入文件就完成了执行。而上面的流处理应用程序的执行没有开始和停止的标记。

几个决定Spark Streaming应用程序生命周期的方法

方法 描述
start() 开始执行应用程序
awaitTermination() 等待应用程序终止
stop() 强制应用程序停止执行

一个Spark应用程序的执行过程如下图

Yarn-Cluster运行模式执行过程

spark 控制进程

守护进程(Daemon) 描述
Driver(驱动程序) 包含SparkContext实例的应用程序入口点
Master(主进程) 负责调度和资源编排
Worker(子进程) 负责节点状态和运行执行器
Executor(执行器) 根据作业分配,负责执行该作业派发的任务

为了减少网络流量,强烈建议在集群机器上运行驱动程序,例如在Master节点,特别是需要驱动程序从Worker中提取数据的情况。

Spark分层执行结构

实体 描述
Application(应用程序) SparkContext的一个实例
Job(作业) 一个Action后执行的一组阶段
Stage(阶段) 在shuffle内的一组转换
Task set(任务组) 来自同一组阶段的任务组
Task(任务) 一个阶段里的执行单元

有了上面的背景,我们下面便从几个方面来讨论下Spark Streaming的优化。

2. 调优

2.1 并行化

2.1.1 执行器Executor

num-executors

执行器是一个在每个Worker上执行的JVM进程。那么如何选择执行器的数量呢?理论上来说,既然executor是JVM进程,应该多一点才好。但是我们在选择executor数量的时候,有几条经验可供参考:

  • 为每个节点上的操作系统和其他服务留出一些资源
  • 如果在YARN上运行,也占用应用程序Master

executor-memory

该参数用于设置每个Executor进程的内存,Executor内存的大小,很多时候直接决定了Spark作业的性能。根据团队的资源队列的最大内存限制是多少, num-executors乘以 executor-memory,不能超过队列的最大内存量。建议申请的内存量最好不要超过资源队列最大总内存的1/3~1/2。

executor-cores

该参数置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。根据自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。建议 num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右。

2.1.2 任务(Task)

Spark中的task是执行的单元。任务以线程而不是执行器 的进程执行。每个DStream由RDD组成,而RDD又由分区组成。每个分区是一块独立的数据,由一个任务操作。因为一个RDD中的分区数与任务数之间存在几乎一对一的映射。也就是说,DStream并行度是分区数的函数。该分区数取决于依赖关系类型:跨过DStream时如果落在窄依赖区,分区数保持不变,但经过shuffle区由于宽依赖的缘故,这个分区数会发生改变。

shuffle的分区数由 spark.default.parallelism决定,或者如果 spark.default.parallelism未设置,则由构成父DStream的RDD中的最大分区数决定。

实现完全优化的并行度的最佳方法,就是不断试错,和常规Spark应用的调优的方法一样,控制逐渐增加分区的个数,每次将分区数乘以1.5,直到性能停止改进位置。这可以通过Spark UI 进行校准。

综上从Executor和Task的角度,得到Spark Streaming 的一些优化方法,提交Spark作业的脚本大概为:

./spark-submit \
  --master yarn \
  --num-executors 30 \
  --executor-memory 6G \
  --executor-cores 2 \
  --driver-memory 1G \
  --conf spark.default.parallelism=120 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \
  --class com.example.PreJoin Stream-1.0-SNAPSHOT-jar-with-dependencies.jar

2.1.3 创建更多的输入DStream和Receive

每个输入DStream都会在某个Worker的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个DStream达到接收多个数据流的效果。

比如,一个接收多个Kafka Topic的输入DStream,可以拆分成多个输入DStream,每个分别接收一个topic的数据。这样就会创建多个Receiver,从而并行地接收数据,提高吞吐量。然后多个DStream使用union算子进行合并,从而形成一个DStream。核心代码如下:

// 合并流
val resultJsonAll = ssc.union(Seq(resultJson2, resultJson, resultJson3))

2.2 批处理间隔

批处理间隔是另一个直接影响性能的重要因素。它确定了微批处理的周期,也就是规定了每个微批处理能够通过的数据量。批处理间隔设置得太高则每个批处理会有高延迟,设置得太低则导致资源利用不足。

理想的状况是能够以线路速率处理数据,例如数据源每300毫秒发送一次,那么我们也可以这样假设:处理管道数据的延迟时间也为300毫秒。

如何设置批处理间隔,最好采取的策略是每次试验都从高值开始,比如1.5倍。Spark日志可用于计算系统的稳定性,即批处理间隔能否跟上数据速率。在日志中查找 Totaldelay总延迟。如果此值保持接近批处理间隔,则系统是稳定的。否则尝试增加2.1所述的并行化来减少管道的延迟

假如日志如下:

JobScheduler: Total delay: 2.236 s for time 1581859740000 ms (execution: 2.256 s)

这表示该批处理的总延迟为3秒,小于批处理间隔。假设在某些时候可能会出现数据峰值,那么5秒是个不错的值。此外还可以通过Spark UI了解每阶段的延迟细目。Spark UI我们会在另一篇文章详细介绍。

2.3 内存

RDD基于内存计算,在内存中缓存所有内容,会给堆和垃圾收集器增加很大压力。如果应用程序的堆空间不足,可以增加 spark.executor.memory。此外有一些情况,Spark还会使用堆外内存,例如Java NIO采用的字节缓冲区。在YARN上,这个额外的内存分配由 spark.yarn.executor.memoryOverhead处理,默认值为 max(executorMemory*0.10384)。如果应用程序使用大量的堆外内存,那么应该增加这个因子。

一般来说,增加堆大小或堆外内存属于最后才会考虑的操作。我们首要的目标是减少应用程序的内存占用。下面介绍实现这一目标的三种方法。

2.3.1 序列化

RDD以序列化形式保存在内存中,可以减少内存使用并改善垃圾收集。默认情况下Spark使用Java序列化,这并不是很高效。Spark支持Kryo,Kryo更有效且性能高,可以将 spark.serializer设置为 org.apache.spark.serializer.KryoSerializer来启用Kryo。

2.3.2 压缩

除了序列化RDD之外。还可以将 spark.rdd.compress设置为true来进行压缩。

2.3.3 垃圾收集

流处理应用程序大量的对象增加了JVM垃圾收集的压力,频繁的GC会增加程序的延迟。建议对驱动程序和执行器使用CMS垃圾收集器,与应用程序同时运行垃圾收集来缩短暂停时间。

通过传递 --driver-java-options-XX:+UseConcMarkSweepGCspark-submit,为驱动程序启动CMS。对于执行器,将参数 spark.executor.extraJavaOptions设置为 XX:+UseConcMarkSweepGC,来启用CMS垃圾收集。

2.4 Shuffle

每次触发shuffle都会在集群中来回复制数据,这将付出很高的磁盘和网络I/O开销。因此在设计流应用程序的时候应该遵循一些原则:

2.4.1 提前投影过滤

提前进行投影和过滤,可以减少下游算子处理的数据。

2.4.2 多使用Combiner

Combiner使用的是map端聚合,可以减少在shuffle过程中需要处理的数据量。如使用reduceByKey(+)可以在shuffle之前的分区级别启用本地聚合。

2.4.2 大量运用并行化

shuffle操作内部使用分组操作的Hash映射来对分区空间进行分隔,这可能会导致堆空间耗尽。通过增加*ByKey()任务的的并行度,减少其工作集来避免这种情况。

2.4.3 文件合并

在大量shuffle任务的情况下,合并中间文件以改善磁盘查找是很有用的。可以设置 spark.shuffle.consolidateFilestrue,启用合并。

2.4.4 更多内存

RDD,shuffle和应用程序对象之间共用执行器Java堆。默认情况下,RDD使用内存的60%( spark.storage.memoryFraction),shuffle使用20%( spark.shuffle.memoryFraction)。过多地使用将使shuffle聚合阶段的数据溢出到磁盘。如果使用shuffle比较多,则可以适当增加shuffle内存的的占用比例,以减少对磁盘的溢出次数。

Shuffle所涉及的问题比较复杂,调优的点也很多,我们会在另一篇文章详细介绍。

相关文章
|
1月前
|
SQL 分布式计算 HIVE
sparksql 参数调优
sparksql 参数调优
|
SQL 存储 分布式计算
工作常用之Spark调优【一】
Spark 3.0 大版本发布, Spark SQL 的优化占比将近 50% 。 Spark SQL 取代 Spark Core ,成为新一代的引擎内核,所有其他子框架如 Mllib 、 Streaming 和 Graph ,都可以共享 SparkSQL 的性能优化,都能从 Spark 社区对于 Spark SQL 的投入中受益。
181 0
工作常用之Spark调优【一】
|
存储 SQL 数据挖掘
Apache Doris Join实现与调优实践
8月 14 号,由示说网和上海白玉兰开源开放研究院联合举办的开源大数据技术线上 Meetup 如期举行,Apache Doris 社区受邀参与本次 Meetup ,来自百度的数据内核高级研发工程师、Apache Doris Contributor 李昊鹏为大家带来了题为“ Apache Doris 的Join实现与调优实践 ”的主题分享,主要介绍了 Apache Doris Join 的实现机制以及调优策略实战,以下是分享内容。
470 0
Apache Doris Join实现与调优实践
|
缓存 分布式计算 NoSQL
调优 | Apache Hudi应用调优指南
通过Spark作业将数据写入Hudi时,Spark应用的调优技巧也适用于此。如果要提高性能或可靠性,请牢记以下几点。
482 0
|
缓存 分布式计算 资源调度
Spark面试题(六)——Spark资源调优
Spark资源调优的方法。
244 0
Spark面试题(六)——Spark资源调优
|
分布式计算 Java Spark
Spark面试题(八)——Spark的Shuffle配置调优
对Spark的Shuffle配置调优建议。
449 0
|
SQL 消息中间件 分布式计算
Spark面试题(五)——数据倾斜调优
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
245 0
Spark面试题(五)——数据倾斜调优
|
分布式计算 资源调度 Hadoop
hadoop调优及常用调优参数
hadoop调优及常用调优参数
|
SQL 分布式计算 运维
技本功|Hive优化之Spark执行引擎参数调优(二)
影响Hive效率的主要有数据倾斜、数据冗余、job的IO以及不同底层引擎配置情况和Hive本身参数和HiveSQL的执行等因素。 本文主要结合实际业务情况,在使用Spark作为底层引擎时,通过一些常见的配置参数对报错任务进行调整优化,主要包含以下两个方面:
1140 0
技本功|Hive优化之Spark执行引擎参数调优(二)
|
消息中间件 缓存 分布式计算
Spark调优策略
在利用Spark处理数据时,如果数据量不大,那么Spark的默认配置基本就能满足实际的业务场景。但是当数据量大的时候,就需要做一定的参数配置调整和优化,以保证业务的安全、稳定的运行。并且在实际优化中,要考虑不同的场景,采取不同的优化策略。

热门文章

最新文章