【k8s系列2】spark on k8s(kubernetes) DynamicResourceAllocation(DRA)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 【k8s系列2】spark on k8s(kubernetes) DynamicResourceAllocation(DRA)
 随着大数据时代的到来,以及kubernetes的愈发火热,好多公司已经把spark应用从yarn迁移到k8s,当然也踩了不少的坑,    
 现在我们来分析一下spark on k8s的DynamicResourceAllocation这个坑

spark on yarn 中的DynamicResourceAllocation


spark on yarn对于DynamicResourceAllocation分配来说,从spark 1.2版本就已经开始支持了.

对于spark熟悉的人都知道,如果我们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,

对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark应用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的效果


我们直接从CoarseGrainedExecutorBackend中SparkEnv创建开始说,每一个executor的启动,必然会经过CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的创建

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的创建就涉及到BlockManager的创建。沿着代码往下走,最终

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在blockManager的initialize方法中,就会进行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
     registerWithExternalShuffleServer()
   }

如果我们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把当前的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

这里我重点分析一下registerWithExternalShuffleServer的方法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)            

该代码中shuffleServerId来自于:

shuffleServerId = if (externalShuffleServiceEnabled) {
     logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {
     blockManagerId
   }

而blockTransferService.hostName 是我们在SparkEnv中创建的时候由advertiseAddress传过来的,

最终由CoarseGrainedExecutorBackend 主类参数hostname过来的,那到底怎么传过来的呢?

参照ExecutorRunnable的prepareCommand方法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而这个hostname的值最终由YarnAllocator的方法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

传递过来的,也就是说我们最终获取到了yarn节点,也就是nodeManager的host

这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动态资源分配的

ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动态资源分配的NettyBlockTransferService大同小异了


spark on k8s(kubernetes) 中的DynamicResourceAllocation


参考之前的文章,我们知道在entrypoint中我们在启动executor的时候,我们传递了hostname参数

executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

这样按照以上流程的分析,即使我们在每个k8s节点开启ExternalShuffleService服务,且pod挂载了持久化盘,

executor也不能向k8s节点ExternalShuffleService服务注册,因为我们注册的节点是POD IP,而不是节点IP,

当然spark社区早就提出了未开启external shuffle service的动态资源分配,且已经合并到master分支.

具体配置,可以参照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout  60s
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
9天前
|
Kubernetes 应用服务中间件 nginx
Kubernetes 入门指南:快速上手容器编排
【8月更文第29天】Kubernetes(简称 K8s)是一个开源平台,用于自动化容器化应用程序的部署、扩展和管理。它提供了一种便捷的方式来部署和运行应用程序,而无需关心底层基础设施的细节。本指南将带你从零开始学习 Kubernetes 的基础知识,并帮助你部署第一个应用。
41 0
|
8天前
|
存储 Kubernetes Cloud Native
探索Python编程的奥秘云原生时代的容器编排:Kubernetes入门与实践
【8月更文挑战第30天】本文以浅显易懂的方式,探讨了Python编程的核心概念和技巧。从基础语法到高级特性,再到实际应用案例,逐步引导读者深入理解Python编程的精髓。通过本文的学习,读者将能够掌握Python编程的基本技能,并激发进一步探索的兴趣。
24 13
|
13天前
|
Kubernetes 容器 Perl
【Azure K8S】AKS升级 Kubernetes version 失败问题的分析与解决
【Azure K8S】AKS升级 Kubernetes version 失败问题的分析与解决
|
15天前
|
运维 Kubernetes 监控
在K8S中,Kubernetes常见的部署方式有哪些?
在K8S中,Kubernetes常见的部署方式有哪些?
|
15天前
|
存储 Kubernetes API
在K8S中,Kubernetes的组件有哪些?
在K8S中,Kubernetes的组件有哪些?
|
15天前
|
Kubernetes 负载均衡 开发者
在K8S中,Kubernetes与Docker有什么关系?
在K8S中,Kubernetes与Docker有什么关系?
|
15天前
|
Kubernetes 负载均衡 调度
在K8S中,什么是kubernetes?
在K8S中,什么是kubernetes?
|
15天前
|
Prometheus Kubernetes 监控
在K8S中,如何进行Kubernetes迁移工作?
在K8S中,如何进行Kubernetes迁移工作?
|
4天前
|
存储 Kubernetes 负载均衡
CentOS 7.9二进制部署K8S 1.28.3+集群实战
本文详细介绍了在CentOS 7.9上通过二进制方式部署Kubernetes 1.28.3+集群的全过程,包括环境准备、组件安装、证书生成、高可用配置以及网络插件部署等关键步骤。
48 3
CentOS 7.9二进制部署K8S 1.28.3+集群实战
|
4天前
|
Kubernetes 负载均衡 前端开发
二进制部署Kubernetes 1.23.15版本高可用集群实战
使用二进制文件部署Kubernetes 1.23.15版本高可用集群的详细教程,涵盖了从环境准备到网络插件部署的完整流程。
12 2
二进制部署Kubernetes 1.23.15版本高可用集群实战
下一篇
DDNS