Spark Streaming Dynamic Resource Allocation

简介: The goal is to make processing time infinitely close to duration by reducing/increasing resource in spark streaming . And we also hope having a reas

Problem Statement

DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.  
Spark DRA works when there are some executors being idle for  removeExecutorInterval time,then they will be removed or when there is a backlog of pending tasks waiting to be rescheduled,then new executors will be added。This mechanism works fine for Long-Time stage。 Spark Streaming is known as  micro batch processing,hence most of time the duration of batch is small. It will cause churn when removing executors at the end of the batch and adding executors at the beginning of next batch. 
 In real-time data processing, data slowly  increase or slowly down,so the adjacent batches have almost the same data to process.
The perfect result of spark streaming is processing time equal to duration. So the key concept is reducing/increasing resource until processing time infinitely close to duration .

Goals

The goal is to make processing time  infinitely close to duration by reducing/increasing resource in  spark streaming . And we also hope having a reasonable stable time  after short-time adjustment which means resource adjustment rarely  happens

Design Summary

The same as spark DRA, Spark Streaming DRA is also  disabled,but can be enabled by a new Spark conf property spark.streaming.dynamicAllocation.enabled. All related Spark Streaming DRA properties introduced will bepackaged in the spark.streaming.dynamicAllocation.* namespace

Adding Executors

The job in Spark Streaming is  time-sensitive  .  Once  there is a batch delayed it triggers Adding-Executors action. This Action  will request  the number of spark.streaming.dynamicAllocation.maxExecutors executors from Yarn immediately in  greedy way since we hope we can eliminate the delay as soon as possible ,at the same time,we also should take care  of the situation when requesting resource with multi rounds  may have resource scarcity  in shared cluster.
Removing Executors
As mentioned before, Spark Streaming,  time-sensitive , so the action removing executors  is not allowed to consume too much time of duration. So executors marked as removed will be added to pendingToRemove set so new tasks will not be launched in them ,then we acknowledge yarn to kill them asynchronously.

Algorithm of DRA

To calculate the number of executor should be removed every round,the formula is used as following
val totalRemoveExecutorNum = Math.round(
currentExecutors * (
(duration.toDouble - processDuration) / duration - reserveRate)
)
val actualShouldRemoveExecutorNumInThisRound = totalRemoveExecutorNum / releaseRounds
In this formula, we suppose processing time  has a strong relationship with executor number,however,they are not  linear relationship. To fix this, we add some new parameters like  reserveRate and releaseRounds to make sure we fit this non-linear relationship .
We also provide a  heuristic strategy to reduce executor number. This formula will be calculate in every round, so every round the number will be readjusted circularly ,and finally actualShouldRemoveExecutorNumInThisRound will converge to zero .

Implementation of DRA

New classes should be added in spark streaming module:
package org.apache.spark.streaming
private[spark] class StreamingExecutorAllocationManager(client: ExecutorAllocationClient,
                                                        duration: Long,
                                                        steamingListenerBus: StreamingListenerBus,
                                                        listenerBus: LiveListenerBus,
                                                        conf: SparkConf) extends Logging {
  allocationManager =>
......
}

private class StreamingExecutorAllocationListener extends SparkListener {
......
}

private class StreamingSchedulerListener extends StreamingListener {
.....
}
StreamingExecutorAllocationManager is initialed in StreamingContext 
Class affected in spark core module:
package org.apache.spark
private[spark] trait ExecutorAllocationClient {
   //blackList who can remove executors immediately make Yarn remove them asynchronously possible
  def addExecutorToPendingStatus(executorId: String): Unit = {}

  //cause spark streaming is initial late then spark core module,
  // we need to know how many executors we already have when 
  // spark streaming is initaled
  def executors(): List[String] = { List() }
}
SparkContext and CoarseGrainedSchedulerBackend both are affected because they are extend from ExecutorAllocationClient.
JobScheduler is also should be modified so before batch job submitted we can adjust resource.
package org.apache.spark.streaming.scheduler
private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)

      //before job submitted,we should compute resource actually required
      ssc.executorAllocationManager match {
        case Some(eam)=>  eam.run()
        case None => //do nothing
      }
   .........
    }
  }
......}

DRA Available Properties

Enable DRA:
spark.streaming.dynamicAllocation.enabled=true
Upper/Lower bound for the number of executors if dynamic allocation is enabled.
spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50
More message will be printed in log if DRA is enabled. Default is false
spark.streaming.dynamicAllocation.debug=true
Rounds(Batches) will be used to release the resource calculated in current round. Default is 5
spark.streaming.dynamicAllocation.releaseRounds=5
The number of rounds(Batches) should be remembered. We can increase this number if batch processing time is unstable . this number affects processDuration in    (duration.toDouble - processDuration) .Default is 1 
spark.streaming.dynamicAllocation.rememberBatchSize=1
DRA delays to  work when  the specific number of rounds has been submitted. Default is 10
spark.streaming.dynamicAllocation.delay.rounds=10
Make sure the resource  is more than reserveRate * current number of Executors. More useful than minExecutorNumber. Default is 0.2
spark.streaming.dynamicAllocation.reserveRate=0.2
目录
相关文章
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
7天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
|
1月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
43 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
30 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
29 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
30 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
39 0