SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)

简介: SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)

背景

本文基于SPARK 3.3.0

在Spark 3.3.0中出现了一个新特性那就是自动重启Executor,这个主要解决是什么问题呢? 主要解决在Streaming中由于一个Executor的处理延迟导致整个Streaming任务延迟,但是这也是适用于批任务,使得批任务Executor的驱逐更加灵活。具体的可参考SPARK-37810


分析

在spark 3.3.0之前,如果发现任务是比较慢活着任务失败了,


可以开启spark.speculation(默认是关闭的),进行推测执行,

也可以开启spark.excludeOnFailure.enabled (默认是关闭的)以保证task不会重新调度到失败的Executor上


如果发现executor失败了,可以开启spark.excludeOnFailure.killExcludedExecutors(默认是关闭的),确保在fetch失败的时候,把execlude给删除掉。


但是这些都是事后的弥补方式,所以这里提出的Executor Rolling是事前预测执行的方式,该方式会周期性的轮询。


直接看代码ExecutorRollPlugin:


class ExecutorRollPlugin extends SparkPlugin {
  override def driverPlugin(): DriverPlugin = new ExecutorRollDriverPlugin()
  // No-op
  override def executorPlugin(): ExecutorPlugin = null
}

它继承SparkPlugin,关于SparkPlugin,可以参考spark 3.x Plugin Framework,总的来说,spark提供了一种插件机制,我们可以灵活的用它来做自己想要的事情,比如说 自定义指标等等。


我们看到这里executorPlugin方法是为null的,因为Executor的启动停止调度是在Driver进行的,所以executor根本不需要。

而对于ExecutorRollDriverPlugin:


class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
  override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
    val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
    if (interval <= 0) {
      logWarning(s"Disabled due to invalid interval value, '$interval'")
    } else if (!sc.conf.get(DECOMMISSION_ENABLED)) {
      logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.")
    } else {
      minTasks = sc.conf.get(MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING)
      // Scheduler is not created yet
      sparkContext = sc
      val policy = ExecutorRollPolicy.withName(sc.conf.get(EXECUTOR_ROLL_POLICY))
      periodicService.scheduleAtFixedRate(() => {
        try {
          sparkContext.schedulerBackend match {
            case scheduler: KubernetesClusterSchedulerBackend =>
              val executorSummaryList = sparkContext
                .statusStore
                .executorList(true)
              choose(executorSummaryList, policy) match {
                case Some(id) =>
                  // Use decommission to be safe.
                  logInfo(s"Ask to decommission executor $id")
                  val now = System.currentTimeMillis()
                  scheduler.decommissionExecutor(
                    id,
                    ExecutorDecommissionInfo(s"Rolling via $policy at $now"),
                    adjustTargetNumExecutors = false)
                case _ =>
                  logInfo("There is nothing to roll.")
              }
            case _ =>
              logWarning("This plugin expects " +
                s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.")
          }
        } catch {
          case e: Throwable => logError("Error in rolling thread", e)
        }
      }, interval, interval, TimeUnit.SECONDS)
    }
    Map.empty[String, String].asJava
  }
....
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
      : Option[String] = {
    val listWithoutDriver = list
      .filterNot(_.id.equals(SparkContext.DRIVER_IDENTIFIER))
      .filter(_.totalTasks >= minTasks)
    val sortedList = policy match {
      case ExecutorRollPolicy.ID =>
        // We can convert to integer because EXECUTOR_ID_COUNTER uses AtomicInteger.
        listWithoutDriver.sortBy(_.id.toInt)
      case ExecutorRollPolicy.ADD_TIME =>
        listWithoutDriver.sortBy(_.addTime)
      case ExecutorRollPolicy.TOTAL_GC_TIME =>
        listWithoutDriver.sortBy(_.totalGCTime).reverse
      case ExecutorRollPolicy.TOTAL_DURATION =>
        listWithoutDriver.sortBy(_.totalDuration).reverse
      case ExecutorRollPolicy.AVERAGE_DURATION =>
        listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
      case ExecutorRollPolicy.FAILED_TASKS =>
        listWithoutDriver.sortBy(_.failedTasks).reverse
      case ExecutorRollPolicy.OUTLIER =>
        // If there is no outlier we fallback to TOTAL_DURATION policy.
        outliersFromMultipleDimensions(listWithoutDriver) ++
          listWithoutDriver.sortBy(_.totalDuration).reverse
      case ExecutorRollPolicy.OUTLIER_NO_FALLBACK =>
        outliersFromMultipleDimensions(listWithoutDriver)
    }
    sortedList.headOption.map(_.id)
  }

这里是periodicService单个线程定时触发,如果发现backend是k8s的话(所以目前只适用于spark on k8s),就会从已有的AppStatusStore(通过AppStatusListener机制获取到对应的Event,从而存储信息,目前来看,executor的metrics信息是通过heartbeat来传递到driver端的)存储中取出Executor的信息,进而根据配置的策略(Executor创建的ID,失败的task,GC时间等)进行驱逐。

当然在驱逐Executor的时候,也会考虑目前在Executor上运行的task的个数,具体配置为spark.kubernetes.executor.minTasksPerExecutorBeforeRolling(默认是0),只有小于等于该阈值,才会kill 对应的Executor,而且默认是只驱逐一个Executor。


该方式的优点:


  • 事前的处理方式,而不是事后处理
  • 独立于ExecutorPodsAllocator,使组件之间功能明确,便于代码维护
  • 适用于动态和静态Executor资源分配的场景
  • 驱逐策略根据运行时的统计信息来的,更加合理


具体使用方式,配置如下:

spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin
spark.decommission.enabled=true
spark.kubernetes.executor.rollInterval=3600s


相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
存储 Java 数据库
javax.security.auth.login.LoginException: Message stream modified (41)
`亲测可用,之前搜索了很多博客,啥样的都有,就是不介绍报错以及配置用处,根本不懂照抄那些配置是干啥的,稀里糊涂的按照博客搭完也跑不起来,因此记录这个。` `项目背景`:公司项目当前采用http协议+shiro+mysql的登录认证方式,而现在想支持ldap协议认证登录然后能够访问自己公司的项目网站。 `举例说明`:假设我们公司有自己的门户网站,现在我们收购了一家公司,他们数据库采用ldap存储用户数据,那么为了他们账户能登陆我们公司项目所以需要集成,而不是再把他们的账户重新在mysql再创建一遍,万一人家有1W个账户呢,不累死了且也不现实啊。
269 7
|
索引 Python
真的!千万不要忽略这些python常见报错信息_nameerror name ‘a‘ is not defined
真的!千万不要忽略这些python常见报错信息_nameerror name ‘a‘ is not defined
|
JSON 数据格式
Sublime Text 查找的内容 高亮显示
Sublime Text 查找的内容 高亮显示
1739 0
Sublime Text 查找的内容 高亮显示
|
安全 Java Spring
Spring Boot 关闭 Actuator ,满足安全工具扫描
Spring Boot 关闭 Actuator ,满足安全工具扫描
2173 0
|
存储 Prometheus 监控
使用 Docker 部署 Prometheus + Grafana 监控平台
Prometheus(普罗米修斯R)是一套开源的监控&报警&时间序列数据库的组合,由SoundCloud公司开发。
20404 4
使用 Docker 部署 Prometheus + Grafana 监控平台
|
存储 分布式计算 Kubernetes
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
365 0
|
数据安全/隐私保护
|
Java Android开发 p3c
《阿里巴巴Java开发规约》插件使用介绍
一、简介     阿里巴巴于10月14日在杭州云栖大会上,正式发布了《阿里巴巴Java开发规约》扫描插件!该插件基于《阿里巴巴Java开发规约》手册内容,在扫描代码后,将不符合规约的代码按Blocker/Critical/Major三个等级显示在下方,甚至在IDEA上,还基于Inspection机制提供了实时检测功能,编写代码的同时也能快速发现问题所在。
4894 0
|
3天前
|
云安全 人工智能 算法
以“AI对抗AI”,阿里云验证码进入2.0时代
三层立体防护,用大模型打赢人机攻防战
1301 3