SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)

背景


本文基于SPARK 3.3.0

在Spark 3.3.0中出现了一个新特性那就是自动重启Executor,这个主要解决是什么问题呢? 主要解决在Streaming中由于一个Executor的处理延迟导致整个Streaming任务延迟,但是这也是适用于批任务,使得批任务Executor的驱逐更加灵活。具体的可参考SPARK-37810


分析


在spark 3.3.0之前,如果发现任务是比较慢或者任务失败了,


可以开启spark.speculation(默认是关闭的),进行推测执行,

也可以开启spark.excludeOnFailure.enabled (默认是关闭的)以保证task不会重新调度到失败的Executor上

如果发现executor失败了,可以开启spark.excludeOnFailure.killExcludedExecutors(默认是关闭的),确保在fetch失败的时候,把execlude给删除掉。

但是这些都是事后的弥补方式,所以这里提出的Executor Rolling是事前预测执行的方式,该方式会周期性的轮询。

直接看代码ExecutorRollPlugin:

class ExecutorRollPlugin extends SparkPlugin {
  override def driverPlugin(): DriverPlugin = new ExecutorRollDriverPlugin()
  // No-op
  override def executorPlugin(): ExecutorPlugin = null
}

它继承SparkPlugin,关于SparkPlugin,可以参考spark 3.x Plugin Framework,总的来说,spark提供了一种插件机制,我们可以灵活的用它来做自己想要的事情,比如说 自定义指标等等。


我们看到这里executorPlugin方法是为null的,因为Executor的启动停止调度是在Driver进行的,所以executor根本不需要。

而对于ExecutorRollDriverPlugin:


class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
  override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
    val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
    if (interval <= 0) {
      logWarning(s"Disabled due to invalid interval value, '$interval'")
    } else if (!sc.conf.get(DECOMMISSION_ENABLED)) {
      logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.")
    } else {
      minTasks = sc.conf.get(MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING)
      // Scheduler is not created yet
      sparkContext = sc
      val policy = ExecutorRollPolicy.withName(sc.conf.get(EXECUTOR_ROLL_POLICY))
      periodicService.scheduleAtFixedRate(() => {
        try {
          sparkContext.schedulerBackend match {
            case scheduler: KubernetesClusterSchedulerBackend =>
              val executorSummaryList = sparkContext
                .statusStore
                .executorList(true)
              choose(executorSummaryList, policy) match {
                case Some(id) =>
                  // Use decommission to be safe.
                  logInfo(s"Ask to decommission executor $id")
                  val now = System.currentTimeMillis()
                  scheduler.decommissionExecutor(
                    id,
                    ExecutorDecommissionInfo(s"Rolling via $policy at $now"),
                    adjustTargetNumExecutors = false)
                case _ =>
                  logInfo("There is nothing to roll.")
              }
            case _ =>
              logWarning("This plugin expects " +
                s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.")
          }
        } catch {
          case e: Throwable => logError("Error in rolling thread", e)
        }
      }, interval, interval, TimeUnit.SECONDS)
    }
    Map.empty[String, String].asJava
  }
....
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
      : Option[String] = {
    val listWithoutDriver = list
      .filterNot(_.id.equals(SparkContext.DRIVER_IDENTIFIER))
      .filter(_.totalTasks >= minTasks)
    val sortedList = policy match {
      case ExecutorRollPolicy.ID =>
        // We can convert to integer because EXECUTOR_ID_COUNTER uses AtomicInteger.
        listWithoutDriver.sortBy(_.id.toInt)
      case ExecutorRollPolicy.ADD_TIME =>
        listWithoutDriver.sortBy(_.addTime)
      case ExecutorRollPolicy.TOTAL_GC_TIME =>
        listWithoutDriver.sortBy(_.totalGCTime).reverse
      case ExecutorRollPolicy.TOTAL_DURATION =>
        listWithoutDriver.sortBy(_.totalDuration).reverse
      case ExecutorRollPolicy.AVERAGE_DURATION =>
        listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
      case ExecutorRollPolicy.FAILED_TASKS =>
        listWithoutDriver.sortBy(_.failedTasks).reverse
      case ExecutorRollPolicy.OUTLIER =>
        // If there is no outlier we fallback to TOTAL_DURATION policy.
        outliersFromMultipleDimensions(listWithoutDriver) ++
          listWithoutDriver.sortBy(_.totalDuration).reverse
      case ExecutorRollPolicy.OUTLIER_NO_FALLBACK =>
        outliersFromMultipleDimensions(listWithoutDriver)
    }
    sortedList.headOption.map(_.id)
  }

这里是periodicService单个线程定时触发,如果发现backend是k8s的话(所以目前只适用于spark on k8s),就会从已有的AppStatusStore(通过AppStatusListener机制获取到对应的Event,从而存储信息,目前来看,executor的metrics信息是通过heartbeat来传递到driver端的)存储中取出Executor的信息,进而根据配置的策略(Executor创建的ID,失败的task,GC时间等)进行驱逐。

当然在驱逐Executor的时候,也会考虑目前在Executor上运行的task的个数,具体配置为spark.kubernetes.executor.minTasksPerExecutorBeforeRolling(默认是0),只有小于等于该阈值,才会kill 对应的Executor,而且默认是只驱逐一个Executor。


该方式的优点:


  • 事前的处理方式,而不是事后处理
  • 独立于ExecutorPodsAllocator,使组件之间功能明确,便于代码维护
  • 适用于动态和静态Executor资源分配的场景
  • 驱逐策略根据运行时的统计信息来的,更加合理


具体使用方式,配置如下:


spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin
spark.decommission.enabled=true
spark.kubernetes.executor.rollInterval=3600s

当然,对于对于这种Executor驱逐,有其他公司也提出了不同的解决方法,如:SPARK-37028

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
2月前
|
Kubernetes 监控 安全
Kubernetes实战:集群管理与自动化技术详解
【6月更文挑战第27天】Kubernetes实战聚焦集群自动化管理,涵盖核心概念如主从架构、API Server及工作节点,强调自动扩缩容、RBAC安全控制与日志监控。通过IaC工具如Helm实现配置自动化,结合Prometheus等进行持续监控,强调安全策略与资源优化,展现K8s在现代应用管理中的威力。
|
3月前
|
消息中间件 运维 Kubernetes
构建高效自动化运维体系:Ansible与Kubernetes的融合实践
【5月更文挑战第9天】随着云计算和微服务架构的普及,自动化运维成为确保系统可靠性和效率的关键。本文将深入探讨如何通过Ansible和Kubernetes的集成,构建一个强大的自动化运维体系。我们将分析Ansible的配置管理功能以及Kubernetes容器编排的优势,并展示如何将二者结合,以实现持续部署、快速扩展和高效管理现代云原生应用。文章还将涵盖实际案例,帮助读者理解在真实环境下如何利用这些工具优化运维流程。
|
11天前
|
运维 Kubernetes Cloud Native
OpenKruise:云原生应用自动化的超级引擎,让Kubernetes焕发超能力!
【8月更文挑战第8天】在现代云计算中,云原生应用借助Kubernetes实现了标准化部署。OpenKruise作为扩展工具库,增强了Kubernetes的功能,提供自动化管理复杂应用的能力。通过兼容的控制器、CRDs及Operator模式,OpenKruise简化了应用操作。用户可通过Helm安装,并利用如CloneSet等功能高效复制与管理Pods,从而专注于业务开发而非运维细节,提升云原生应用的灵活性与效率。
34 6
|
12天前
|
运维 Kubernetes 监控
云原生时代的运维革新:Kubernetes的自动化之旅
在云原生技术不断演进的今天,Kubernetes已成为容器编排的事实标准。本文将深入探讨Kubernetes如何通过自动化工具和实践,为运维团队提供高效率、高可用性的解决方案,从而推动运维工作向更高效、智能化的方向转型。
30 6
|
1月前
|
Kubernetes 持续交付 Python
Kubernetes(通常简称为K8s)是一个开源的容器编排系统,用于自动化部署、扩展和管理容器化应用程序。
Kubernetes(通常简称为K8s)是一个开源的容器编排系统,用于自动化部署、扩展和管理容器化应用程序。
|
1月前
|
Kubernetes Cloud Native 持续交付
云原生架构的核心组成部分通常包括容器化(如Docker)、容器编排(如Kubernetes)、微服务架构、服务网格、持续集成/持续部署(CI/CD)、自动化运维(如Prometheus监控和Grafana可视化)等。
云原生架构的核心组成部分通常包括容器化(如Docker)、容器编排(如Kubernetes)、微服务架构、服务网格、持续集成/持续部署(CI/CD)、自动化运维(如Prometheus监控和Grafana可视化)等。
|
2月前
|
分布式计算 Kubernetes Spark
大数据之spark on k8s
大数据之spark on k8s
|
3月前
|
运维 Kubernetes 调度
【kubernetes】关于k8s集群的污点、容忍、驱逐以及k8s集群故障排查思路
【kubernetes】关于k8s集群的污点、容忍、驱逐以及k8s集群故障排查思路
|
3月前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于Docker和Kubernetes的实践
【5月更文挑战第30天】 在当今的快速迭代和持续部署的软件发布环境中,自动化运维的重要性愈发凸显。本文旨在探讨如何利用容器化技术与微服务架构,特别是Docker和Kubernetes,来构建一个高效、可伸缩且自愈的自动化运维体系。通过详细分析容器化的优势及Kubernetes的集群管理机制,文章将提供一个清晰的指南,帮助读者理解并实现现代软件部署的最佳实践。
|
3月前
|
分布式计算 Kubernetes 监控
容器服务Kubernetes版产品使用合集之怎么实现把 spark 跑在k8s
容器服务Kubernetes版,作为阿里云提供的核心服务之一,旨在帮助企业及开发者高效管理和运行Kubernetes集群,实现应用的容器化与微服务化。以下是关于使用这些服务的一些建议和合集,涵盖基本操作、最佳实践、以及一些高级功能的使用方法。

推荐镜像

更多