【k8s系列2】spark on k8s(kubernetes) DynamicResourceAllocation(DRA)

简介: 【k8s系列2】spark on k8s(kubernetes) DynamicResourceAllocation(DRA)
 随着大数据时代的到来,以及kubernetes的愈发火热,好多公司已经把spark应用从yarn迁移到k8s,当然也踩了不少的坑,    
 现在我们来分析一下spark on k8s的DynamicResourceAllocation这个坑

spark on yarn 中的DynamicResourceAllocation


spark on yarn对于DynamicResourceAllocation分配来说,从spark 1.2版本就已经开始支持了.

对于spark熟悉的人都知道,如果我们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,

对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark应用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的效果


我们直接从CoarseGrainedExecutorBackend中SparkEnv创建开始说,每一个executor的启动,必然会经过CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的创建

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的创建就涉及到BlockManager的创建。沿着代码往下走,最终

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在blockManager的initialize方法中,就会进行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
     registerWithExternalShuffleServer()
   }

如果我们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把当前的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

这里我重点分析一下registerWithExternalShuffleServer的方法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)            

该代码中shuffleServerId来自于:

shuffleServerId = if (externalShuffleServiceEnabled) {
     logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {
     blockManagerId
   }

而blockTransferService.hostName 是我们在SparkEnv中创建的时候由advertiseAddress传过来的,

最终由CoarseGrainedExecutorBackend 主类参数hostname过来的,那到底怎么传过来的呢?

参照ExecutorRunnable的prepareCommand方法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而这个hostname的值最终由YarnAllocator的方法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

传递过来的,也就是说我们最终获取到了yarn节点,也就是nodeManager的host

这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动态资源分配的

ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动态资源分配的NettyBlockTransferService大同小异了


spark on k8s(kubernetes) 中的DynamicResourceAllocation


参考之前的文章,我们知道在entrypoint中我们在启动executor的时候,我们传递了hostname参数

executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

这样按照以上流程的分析,即使我们在每个k8s节点开启ExternalShuffleService服务,且pod挂载了持久化盘,

executor也不能向k8s节点ExternalShuffleService服务注册,因为我们注册的节点是POD IP,而不是节点IP,

当然spark社区早就提出了未开启external shuffle service的动态资源分配,且已经合并到master分支.

具体配置,可以参照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout  60s
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
4月前
|
存储 Kubernetes 网络安全
关于阿里云 Kubernetes 容器服务(ACK)添加镜像仓库的快速说明
本文介绍了在中国大陆地区因网络限制无法正常拉取 Docker 镜像的解决方案。作者所在的阿里云 Kubernetes 集群使用的是较旧版本的 containerd(1.2x),且无法直接通过 SSH 修改节点配置,因此采用了一种无需更改 Kubernetes 配置文件的方法。通过为 `docker.io` 添加 containerd 的镜像源,并使用脚本自动修改 containerd 配置文件中的路径错误(将错误的 `cert.d` 改为 `certs.d`),最终实现了通过多个镜像站点拉取镜像。作者还提供了一个可重复运行的脚本,用于动态配置镜像源。虽然该方案能缓解镜像拉取问题,
469 2
|
10月前
|
存储 Kubernetes 监控
K8s集群实战:使用kubeadm和kuboard部署Kubernetes集群
总之,使用kubeadm和kuboard部署K8s集群就像回归童年一样,简单又有趣。不要忘记,技术是为人服务的,用K8s集群操控云端资源,我们不过是想在复杂的世界找寻简单。尽管部署过程可能遇到困难,但朝着简化复杂的目标,我们就能找到意义和乐趣。希望你也能利用这些工具,找到你的乐趣,满足你的需求。
923 33
|
10月前
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
10月前
|
存储 人工智能 Kubernetes
ACK Gateway with AI Extension:面向Kubernetes大模型推理的智能路由实践
本文介绍了如何利用阿里云容器服务ACK推出的ACK Gateway with AI Extension组件,在Kubernetes环境中为大语言模型(LLM)推理服务提供智能路由和负载均衡能力。文章以部署和优化QwQ-32B模型为例,详细展示了从环境准备到性能测试的完整实践过程。
|
12月前
|
存储 运维 Kubernetes
正式开源,Doris Operator 支持高效 Kubernetes 容器化部署方案
飞轮科技推出了 Doris 的 Kubernetes Operator 开源项目(简称:Doris Operator),并捐赠给 Apache 基金会。该工具集成了原生 Kubernetes 资源的复杂管理能力,并融合了 Doris 组件间的分布式协同、用户集群形态的按需定制等经验,为用户提供了一个更简洁、高效、易用的容器化部署方案。
549 16
正式开源,Doris Operator 支持高效 Kubernetes 容器化部署方案
|
11月前
|
监控 Kubernetes Cloud Native
基于阿里云容器服务Kubernetes版(ACK)的微服务架构设计与实践
本文介绍了如何基于阿里云容器服务Kubernetes版(ACK)设计和实现微服务架构。首先概述了微服务架构的优势与挑战,如模块化、可扩展性及技术多样性。接着详细描述了ACK的核心功能,包括集群管理、应用管理、网络与安全、监控与日志等。在设计基于ACK的微服务架构时,需考虑服务拆分、通信、发现与负载均衡、配置管理、监控与日志以及CI/CD等方面。通过一个电商应用案例,展示了用户服务、商品服务、订单服务和支付服务的具体部署步骤。最后总结了ACK为微服务架构提供的强大支持,帮助应对各种挑战,构建高效可靠的云原生应用。
|
11月前
|
弹性计算 人工智能 资源调度
DeepSeek大解读系列公开课上新!阿里云专家主讲云上智能算力、Kubernetes容器服务、DeepSeek私有化部署
智猩猩「DeepSeek大解读」系列公开课第三期即将开讲,聚焦阿里云弹性计算助力大模型训练与部署。三位专家将分别讲解智能算力支撑、Kubernetes容器服务在AI场景的应用实践、以及DeepSeek一键部署和多渠道应用集成,分享云计算如何赋能大模型发展。欲观看直播,可关注【智猩猩GenAI视频号】预约。 (239字符)
|
10月前
|
存储 运维 Kubernetes
容器数据保护:基于容器服务 Kubernetes 版(ACK)备份中心实现K8s存储卷一键备份与恢复
阿里云ACK备份中心提供一站式容器化业务灾备及迁移方案,减少数据丢失风险,确保业务稳定运行。
|
人工智能 运维 监控
容器服务Kubernetes场景下可观测体系生产级最佳实践
阿里云容器服务团队在2024年继续蝉联Gartner亚洲唯一全球领导者象限,其可观测体系是运维的核心能力之一。该体系涵盖重保运维、大规模集群稳定性、业务异常诊断等场景,特别是在AI和GPU场景下提供了全面的观测解决方案。通过Tracing、Metric和Log等技术,阿里云增强了对容器网络、存储及多集群架构的监控能力,帮助客户实现高效运维和成本优化。未来,结合AI助手,将进一步提升问题定位和解决效率,缩短MTTR,助力构建智能运维体系。
|
分布式计算 资源调度 Kubernetes
Spark on Kubernetes 的现状与挑战
云原生时代,Kubernetes 的重要性日益凸显,这篇文章以 Spark 为例来看一下大数据生态 on Kubernetes 生态的现状与挑战。
2517 57

推荐镜像

更多