Spark - Executor 初始化 && 报警都进行1次

简介: 程序启动 M个 Executor,每个 Executor 共 N core,即每个 Executor 上的 task = N,现在有一个 object 需要在每一个 task 上初始化公用变量,本文介绍如何只初始化一次以及异常情况下只报警一次的方法。

一.引言

程序启动 M 个 Executor,每个 Executor 共 N core,即每个 Executor 上的 task = N,现在有一个 object 需要在每一个 task 上初始化公用变量,有两个问题需要解决:

A.该公用变量在 executor 上共用,所以 N 个 task 只需要有一个构造,其他 N-1 个 task 共用即可

image.gif编辑

B.初始化阶段,如果报错则至少会收到 M 个相同的报警日志,如何压缩到只收一次

image.gif编辑

二.一个 Task 初始化

Executor 上 N 个 task 都需要 InitValue 这个变量,所以 SelfObject 初始化一次 N 个 task 就都可以使用,所以这里要采用类似 singleton 单例类的写法,保证 N 个 task 只有1个 task 初始化,其他 N-1 个 Task 等这一个 Task 初始化即可。基于上述描述,通过加锁的方式可以实现需求:

1.主函数侧修改

image.gif编辑

通过 synchornized 关键字,保证只有一个 task 上执行 init 初始化逻辑,这样即保证了 N-1 个 task 等待 1个 task 初始化,随后公用的原则

2.初始化函数侧修改

为了保证 Singleton,SelfObject 内执行 init 方法时需要判断 initValue 是否为 null,从而保证对应变量只会被初始化一次,这里偷了个懒没有使用双重检测,有需要的同学也可以自己修改为双重检测的 Singleton。

image.gif编辑

通过主函数端 Synchornized + 初始化函数端 Singleton 的配合,我们达成了上面要求的第一个目标,Executor 上只有一个 Task 初始化对应变量,避免 N 个 task 一起初始化可能造成的并发问题,且节省了初始化的开销,非常的奈斯 👍。

三.一次报警

上面我们可以看到 init 函数内包含了 try-catch 逻辑,这是因为我们 initValue 初始化时有可能发生异常,为了提高响应速度,我们在 case 中加入报警策略,这样任务初始化变量异常时,程序员即可第一时间获得报警从而及时修复任务:

image.gif编辑

这样修改后新的问题又来了,虽然上面一个 task 初始化保证了 N 个 task 只有一个执行 init 逻辑,最多报警一次,但是由于有 M 个 Executor,所以如果出错时将会收到 M 次报警,如果 M 非常大,显然这不是一个好的方法,所以我们还需要解决第二个问题,M 个 Executor 只有一个 Executor 发送报警即可。

1.复杂版

A.如何保证一次

遇到这个问题,第一反应是通过 Executor 和 Driver 通信,Executor 端初始化出错后,自己不报错,而是将信息同步至 Driver,由于 Dirver 是单线程的,所以让 Driver 进行单独报错,这样可以保证只发一次报警。

B.Executor-Driver 通信

上面的方法可行,接下来就需要寻找 Executor-Driver 通信的手段了,这里由于平常经常计数,所以第一时间想到了 sc.longAccumulator,如果 Executor 上初始化异常,则对 sc.longAccumulator 进行 add 操作,在 Driver 上启动一个常驻线程定时监测 sc.longAccumulator 的值,如果该值超过 0 则代表 Executor 上 init 出错,这时候报警随后将该线程退出即可。

C.实现

这里将监控逻辑的线程添加至 sparkContext 与 RDD 逻辑之间,常驻线程池内定义了 state 变量,保证报警一次后后续不再报警,epoch + sleep 则控制检测的次数和频率,例如初始化任务执行10min,那么可以 10x1min 的频率检测,也可以 20x30second 的频率检测,取决于自己的节奏。同时需要将 monitorNum 传入 init 函数,如果初始化异常则执行  monitorNum.add 的操作,这样 driver 端的常驻线程即可感知并完成报警一次的操作。

image.gif编辑

Tips:

为什么常驻线程选择 newCachedThreadPool,因为 newCachedThreadPool 线程在空闲 60s 后可以完全退出,即避免 Dirver 端检测线程一直运行对 driver 端造成影响,更详细的 newCachedThreadPool 使用可以参考:Executor - 一文搞懂 ThreadPoolExecutor 与 BlockingQueue。最后贴下完整报警代码:

// 添加监控
    val monitorNum = sc.longAccumulator("Model Monitor")
    val cachedPool: ExecutorService = Executors.newCachedThreadPool()
    cachedPool.execute(new Runnable {
      override def run(): Unit = {
        // 报警1次或9次没报警退出,60s后线程池退出
        var state = true
        var epoch = 0
        while (state && epoch < 9) {
          if (monitorNum.value > 0) {
            SendMailUtil.send("Object 初始化 initValue", "初始化异常!")
            state = false
          } else {
            Thread.sleep(60000)
            epoch += 1
          }
        }
      }
    })

image.gif

2.简单好用版 👍

上面的方法可以解决只发一次报警的问题,但是相对比较复杂,且无法传递 Executor 具体报错信息,只能通过 add 的数字变化感知到报错但无法报错具体初始化错误。有一种更好的办法:指定一个 Executor 发报警,剩下的 Executor 忽略。

A.指定 Executor 报警 (推荐👍)

Spark 可以通过 SparkEnv 上下文变量获取对应 ExecutorId,只需指定一个 Executor 发报警即可

import org.apache.spark.SparkEnv
    val executorId = SparkEnv.get.executorId

image.gif

image.gif编辑

只需要 if 即可,除此之外,还可以根据 e.getMessage 获取报错相关信息,通过报警函数发送,相比与之前单独起线程的方法,省事了很多。

Tips:

针对单轮次的任务,这样指定问题不大,但是如果是循环执行,由于 Executor 挂掉后会起新的 Executor,且 ExecutorId 是累加的 (例如申请10个 Executor,1号 Executor 挂掉了,这时候会新起一个 11号 Executor,而不是重新启动 1号 Executor),这时候再用 Executor == "1" 判断就有风险了,所以如果任务 Executor 是轮循的且 Executor 有挂掉的风险,建议使用 PartitionId 或者上面复杂的版本,但整体而言这个方法最简单且大部分时候有效。

B.指定 TaskId 报警

上面通过 SparkEnv.get 获取 ExecutorId 进行 if 判断并报警,有些同学一定有疑问,可不可以使用 TaskContext 获取 taskId,指定某个 task 发送报警:

import org.apache.spark.TaskContext
    val taskId = TaskContext.getPartitionId()

image.gif

其实也没问题,但是上面的场景不支持,因为使用了 synchronized 关键字,所以初始化时调用的 task 不固定,所以如果 if 指定 taskA 报警,而实际执行 init 的是 taskB,则 taskA 无法感知报错从而发送报警,如果在所有 task 上初始化则没有问题,当然也可以指定一个 PartionId 范围,这样会避免同步锁导致的单一 TaskId 不命中的问题。

image.gif编辑

C.在页面查看 ExecutorId

指定 ExecutorId 报警后,如果想到对应 Executor 查看 e.printStackTrace 的异常栈怎么找:

image.gif编辑

可以打开 spark 日志界面,前面第一列即为 ExecutorId,如果指定 if (executorId == "1") 且 init 阶段报错,接到告警信息后即可第一时间查看 executorId = 1 的 stderr 日志定位 init 初始化阶段的问题,非常的便捷。

四.总结

如何在 Executor 端进行单独初始化 M -> 1,以及如何将 Task 端的错误 MxN -> 1 发送单次报警大致就这么多内容,这里还有一个问题需要解释下,为什么 SelfObject 不在 Dirver 端直接 init 并广播,这样不就直接保证了初始化1次和异常报警1次的需求:这里因为一些 initValue 是官方定义的 class 且不支持序列化,例如 Tensorflow 的 SavedModelBundle ,针对这样的变量如果在 Driver 端初始化并广播则会报 class 无法序列化的问题,所以才需要将 initValue 从 Dirver 初始化挪到 Executor 初始化。同时也有一些反思,面对上述问题,第一时间想到的是 dirver 与 executor 通信并起线程监控的方法,把简单问题复杂化的同时也引入了不必要的资源和时间浪费,还是要透过现象看本质,针对主要矛盾下手,这样可以快速找到最简单可靠的方法。

目录
相关文章
|
存储 缓存 分布式计算
Spark的Driver和Executor
Spark的Driver和Executor
716 0
|
5月前
|
分布式计算 Java Spark
Spark Driver和Executor数据传递使用问题
Spark Driver和Executor数据传递使用问题
32 0
|
存储 分布式计算 Kubernetes
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
161 0
|
分布式计算 Spark C++
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
|
分布式计算 Spark
Spark Executor启动源码分析
Spark CoarseGrainedExecutorBackend启动源码分析 更多资源 github: https://github.
1376 0
|
分布式计算 Spark 存储
Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系
梳理一下Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系。
1501 0
|
分布式计算 Java Shell
[Spark]Spark RDD 指南二 初始化
1. 初始化 Spark程序必须做的第一件事是创建一个JavaSparkContext对象(Scala和Python中是SparkContext对象),它告诉Spark如何访问集群。
1163 0
|
分布式计算 Spark Java