Spark on Yarn: Cluster模式Scheduler实现

简介:

背景

Spark on Yarn分yarn-cluster和yarn-client两种模式。
本文通过Cluster模式的TaskScheduler实现入手,梳理一遍spark on yarn的大致实现逻辑。

前提我对两种模式以及yarn任务的整体运行逻辑不是很清楚。

主体逻辑

cluster模式中,使用的TaskScheduler是YarnClusterScheduler
它继承了默认使用的TaskSchedulerImpl类,额外在postStartHook方法里,唤醒了ApplicationMaster类的设置sparkcontext的方法。
ApplicationMaster相当于是spark在yarn上的AM,内部的YarnRMClient类,负责向RM注册和注销AM,以及拿到attemptId。注册AM之后,得到一个可以申请/释放资源的YarnAllocationHandler类,从而可以维护container与executor之间的关系。

下节具体介绍几个主要类的实现逻辑。

具体实现

AM

ApplicationMaster,通过YarnRMClient来完成自己的注册和注销。

AM的启动方式

/**
 * This object does not provide any special functionality. It exists so that it's easy to tell
 * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
 */
object ExecutorLauncher {

  def main(args: Array[String]) = {
    ApplicationMaster.main(args)
  }

}

main里面调用AM的run方法:

  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args)
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
      System.exit(master.run())
    }
  }

如果AM的启动参数里有用户自己定义的类,则是Driver模式,即cluster模式。用户自己定义的类里面带了spark driver,会在单独一个线程里启动。这也是cluster模式与client模式的区别,用户实现了driver vs 用户只是提交app。

run方法里
1. 如果不是Driver模式,执行runExecutorLauncher逻辑:
启动后,执行registerAM,里面new了YarnAllocator的实现,调用allocateResources,申请并执行container。同时,启动一个reporter线程,每隔一段时间调用YarnAllocator的allocateResources方法,或汇报有太多executor fail了。
2. 如果是Driver模式,执行runDriver逻辑:
也是执行registerAM,但是之前需要反射执行jar包里用户定义的driver类。

YarnAllocator

YarnAllocator负责向yarn申请和释放containers,维护containe、executor相关关系,有一个线程池。申请到container之后,在container里执行ExecutorRunnable。需要子类实现的是申请和释放这两个方法:

protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
protected def releaseContainer(container: Container): Unit

YarnAllocationHandler继承了YarnAllocator。

  1. allocateContainers方法:
    Yarn api里提供ResourceRequest这个类,里面包含了一个app向RM索要不同container的信息,包括机器名/机架名,cpu和mem资源数,container数,优先级,locality是否放松。然后组成AllocateRequest类,代表AM向RM从集群里获得resource。调用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM发起资源请求**。
  2. releaseContainer方法:
    每次把需要release的container记录下来。在每次allocateContainers调用的时候,
    会往AllocateRequest里addAllReleases(releasedContainerList),在请求资源的时候顺便把历史资源释放掉。

ExecutorRunnable与Yarn的关系:
1. 向ContainerManager建立连接,让cm来startContainer。
2. ContainerLaunchContext包含了yarn的NodeManager启动一个container需要的所有信息。ExecutorRunnable会构建这个container申请信息。
可以参考这段启动逻辑:

def startContainer = {
    logInfo("Setting up ContainerLaunchContext")

    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]

    ctx.setContainerId(container.getId())
    ctx.setResource(container.getResource())
    val localResources = prepareLocalResources
    ctx.setLocalResources(localResources)

    val env = prepareEnvironment
    ctx.setEnvironment(env)

    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())

    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))

    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
      appAttemptId, localResources)
    logInfo("Setting up executor with commands: " + commands)
    ctx.setCommands(commands)

    ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
    }

    // Send the start request to the ContainerManager
    val startReq = Records.newRecord(classOf[StartContainerRequest])
    .asInstanceOf[StartContainerRequest]
    startReq.setContainerLaunchContext(ctx)
    cm.startContainer(startReq)
  }

值得注意的是setServiceData方法,如果在node manager上启动了external shuffle service。Yarn的AuxiliaryService支持在NodeManager上启动辅助服务。spark有一个参数spark.shuffle.service.enabled来设置该服务是否被启用,我看的1.2.0版本里貌似没有服务的实现代码。

Executor

此外,从ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通过命令行启动了CoarseGrainedExecutorBackend进程,与粗粒度的mesos模式和standalone模式一致,task最终落到CoarseGrainedExecutorBackend里面执行。

全文完:)

目录
相关文章
|
5月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
301 3
|
2月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
本文详细解析了 Apache Spark 的两种常见部署模式:Standalone 和 YARN。Standalone 模式自带轻量级集群管理服务,适合小规模集群;YARN 模式与 Hadoop 生态系统集成,适合大规模生产环境。文章通过示例代码展示了如何在两种模式下运行 Spark 应用程序,并总结了两者的优缺点,帮助读者根据需求选择合适的部署模式。
112 3
|
3月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
【10月更文挑战第5天】随着大数据处理需求的增长,Apache Spark 成为了广泛采用的大数据处理框架。本文详细解析了 Spark Standalone 与 YARN 两种常见部署模式的区别,并通过示例代码展示了如何在不同模式下运行 Spark 应用程序。Standalone 模式自带轻量级集群管理,适合小规模集群或独立部署;YARN 则作为外部资源管理器,能够与 Hadoop 生态系统中的其他应用共享资源,更适合大规模生产环境。文章对比了两者的资源管理、部署灵活性、扩展性和集成能力,帮助读者根据需求选择合适的部署模式。
55 1
|
4月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
58 5
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
149 0
|
5月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
480 2
|
6月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
4月前
|
资源调度 分布式计算 Hadoop
YARN(Hadoop操作系统)的架构
本文详细解释了YARN(Hadoop操作系统)的架构,包括其主要组件如ResourceManager、NodeManager和ApplicationMaster的作用以及它们如何协同工作来管理Hadoop集群中的资源和调度作业。
194 3
YARN(Hadoop操作系统)的架构
|
4月前
|
资源调度 分布式计算 Hadoop
使用YARN命令管理Hadoop作业
本文介绍了如何使用YARN命令来管理Hadoop作业,包括查看作业列表、检查作业状态、杀死作业、获取作业日志以及检查节点和队列状态等操作。
97 1
使用YARN命令管理Hadoop作业
|
5月前
|
资源调度 分布式计算 算法
【揭秘Yarn调度秘籍】打破资源分配的枷锁,Hadoop Yarn权重调度全攻略!
【8月更文挑战第24天】在大数据处理领域,Hadoop Yarn 是一种关键的作业调度与集群资源管理工具。它支持多种调度器以适应不同需求,默认采用FIFO调度器,但可通过引入基于权重的调度算法来提高资源利用率。该算法根据作业或用户的权重值决定资源分配比例,权重高的可获得更多计算资源,特别适合多用户共享环境。管理员需在Yarn配置文件中启用特定调度器(如CapacityScheduler),并通过设置队列权重来实现资源的动态调整。合理配置权重有助于避免资源浪费,确保集群高效运行,满足不同用户需求。
83 3