《深入理解Spark:核心思想与源码分析》——3.6节创建任务调度器TaskScheduler

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.6节创建任务调度器TaskScheduler,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.6 创建任务调度器TaskScheduler
TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler也可以看做任务调度的客户端。创建TaskScheduler的代码如下。

private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

createTaskScheduler方法会根据master的配置匹配部署模式,创建TaskSchedulerImpl,并生成不同的SchedulerBackend。本章为了使读者更容易理解Spark的初始化流程,故以local模式为例,其余模式将在第7章详解。master匹配local模式的代码如下。

master match {
    case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

3.6.1 创建TaskSchedulerImpl
TaskSchedulerImpl的构造过程如下:
1)从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式有FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。
2)创建TaskResultGetter,它的作用是通过线程池(Executors.newFixedThreadPool创建的,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.default-ThreadFactory)对Worker上的Executor发送的Task的执行结果进行处理。
TaskSchedulerImpl的实现见代码清单3-29。
代码清单3-29 TaskSchedulerImpl的实现

var dagScheduler: DAGScheduler = null
var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
    SchedulingMode.withName(schedulingModeConf.toUpperCase)
} catch {
    case e: java.util.NoSuchElementException =>
        throw new SparkException(s"Unrecognized spark.scheduler.mode: $scheduling-ModeConf")
}

// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
TaskSchedulerImpl的调度模式有FAIR和FIFO两种。任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上的。为方便分析,我们先来看看local模式中SchedulerBackend的实现LocalBackend。LocalBackend依赖于LocalActor与ActorSystem进行消息通信。LocalBackend的实现参见代码清单3-30。
代码清单3-30 LocalBackend的实现
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
    extends SchedulerBackend with ExecutorBackend {

    private val appId = "local-" + System.currentTimeMillis
    var localActor: ActorRef = null

    override def start() {
        localActor = SparkEnv.get.actorSystem.actorOf(
            Props(new LocalActor(scheduler, this, totalCores)),
            "LocalBackendActor")
    }

    override def stop() {
        localActor ! StopExecutor
    }

    override def reviveOffers() {
        localActor ! ReviveOffers
    }

    override def defaultParallelism() =
        scheduler.conf.getInt("spark.default.parallelism", totalCores)

    override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
        localActor ! KillTask(taskId, interruptThread)
    }

    override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
        localActor ! StatusUpdate(taskId, state, serializedData)
    }

    override def applicationId(): String = appId
}

3.6.2 TaskSchedulerImpl的初始化
创建完TaskSchedulerImpl和LocalBackend后,对TaskSchedulerImpl调用方法initialize进行初始化。以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:
1)使TaskSchedulerImpl持有LocalBackend的引用。
2)创建Pool,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息。
3)创建FIFOSchedulableBuilder,FIFOSchedulableBuilder用来操作Pool中的调度队列。
initialize方法的实现见代码清单3-31。
代码清单3-31 TaskSchedulerImpl的初始化

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
        schedulingMode match {
            case SchedulingMode.FIFO =>
                new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
                new FairSchedulableBuilder(rootPool, conf)
        }
    }
    schedulableBuilder.buildPools()
}
相关文章
|
2月前
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
201 3
|
7月前
|
分布式计算 监控 Spark
Spark 任务运行时日志分析
Spark 任务运行时日志分析
103 0
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
47 5
|
6月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7187 43
EMR Serverless Spark PySpark流任务体验报告
|
3月前
|
SQL 机器学习/深度学习 分布式计算
Spark适合处理哪些任务?
【9月更文挑战第1天】Spark适合处理哪些任务?
194 3
|
4月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
|
5月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
426 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
5月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
285 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
219 0