一.引言
程序启动 M 个 Executor,每个 Executor 共 N core,即每个 Executor 上的 task = N,现在有一个 object 需要在每一个 task 上初始化公用变量,有两个问题需要解决:
A.该公用变量在 executor 上共用,所以 N 个 task 只需要有一个构造,其他 N-1 个 task 共用即可
编辑
B.初始化阶段,如果报错则至少会收到 M 个相同的报警日志,如何压缩到只收一次
编辑
二.一个 Task 初始化
Executor 上 N 个 task 都需要 InitValue 这个变量,所以 SelfObject 初始化一次 N 个 task 就都可以使用,所以这里要采用类似 singleton 单例类的写法,保证 N 个 task 只有1个 task 初始化,其他 N-1 个 Task 等这一个 Task 初始化即可。基于上述描述,通过加锁的方式可以实现需求:
1.主函数侧修改
编辑
通过 synchornized 关键字,保证只有一个 task 上执行 init 初始化逻辑,这样即保证了 N-1 个 task 等待 1个 task 初始化,随后公用的原则
2.初始化函数侧修改
为了保证 Singleton,SelfObject 内执行 init 方法时需要判断 initValue 是否为 null,从而保证对应变量只会被初始化一次,这里偷了个懒没有使用双重检测,有需要的同学也可以自己修改为双重检测的 Singleton。
编辑
通过主函数端 Synchornized + 初始化函数端 Singleton 的配合,我们达成了上面要求的第一个目标,Executor 上只有一个 Task 初始化对应变量,避免 N 个 task 一起初始化可能造成的并发问题,且节省了初始化的开销,非常的奈斯 👍。
三.一次报警
上面我们可以看到 init 函数内包含了 try-catch 逻辑,这是因为我们 initValue 初始化时有可能发生异常,为了提高响应速度,我们在 case 中加入报警策略,这样任务初始化变量异常时,程序员即可第一时间获得报警从而及时修复任务:
编辑
这样修改后新的问题又来了,虽然上面一个 task 初始化保证了 N 个 task 只有一个执行 init 逻辑,最多报警一次,但是由于有 M 个 Executor,所以如果出错时将会收到 M 次报警,如果 M 非常大,显然这不是一个好的方法,所以我们还需要解决第二个问题,M 个 Executor 只有一个 Executor 发送报警即可。
1.复杂版
A.如何保证一次
遇到这个问题,第一反应是通过 Executor 和 Driver 通信,Executor 端初始化出错后,自己不报错,而是将信息同步至 Driver,由于 Dirver 是单线程的,所以让 Driver 进行单独报错,这样可以保证只发一次报警。
B.Executor-Driver 通信
上面的方法可行,接下来就需要寻找 Executor-Driver 通信的手段了,这里由于平常经常计数,所以第一时间想到了 sc.longAccumulator,如果 Executor 上初始化异常,则对 sc.longAccumulator 进行 add 操作,在 Driver 上启动一个常驻线程定时监测 sc.longAccumulator 的值,如果该值超过 0 则代表 Executor 上 init 出错,这时候报警随后将该线程退出即可。
C.实现
这里将监控逻辑的线程添加至 sparkContext 与 RDD 逻辑之间,常驻线程池内定义了 state 变量,保证报警一次后后续不再报警,epoch + sleep 则控制检测的次数和频率,例如初始化任务执行10min,那么可以 10x1min 的频率检测,也可以 20x30second 的频率检测,取决于自己的节奏。同时需要将 monitorNum 传入 init 函数,如果初始化异常则执行 monitorNum.add 的操作,这样 driver 端的常驻线程即可感知并完成报警一次的操作。
编辑
Tips:
为什么常驻线程选择 newCachedThreadPool,因为 newCachedThreadPool 线程在空闲 60s 后可以完全退出,即避免 Dirver 端检测线程一直运行对 driver 端造成影响,更详细的 newCachedThreadPool 使用可以参考:Executor - 一文搞懂 ThreadPoolExecutor 与 BlockingQueue。最后贴下完整报警代码:
// 添加监控 val monitorNum = sc.longAccumulator("Model Monitor") val cachedPool: ExecutorService = Executors.newCachedThreadPool() cachedPool.execute(new Runnable { override def run(): Unit = { // 报警1次或9次没报警退出,60s后线程池退出 var state = true var epoch = 0 while (state && epoch < 9) { if (monitorNum.value > 0) { SendMailUtil.send("Object 初始化 initValue", "初始化异常!") state = false } else { Thread.sleep(60000) epoch += 1 } } } })
2.简单好用版 👍
上面的方法可以解决只发一次报警的问题,但是相对比较复杂,且无法传递 Executor 具体报错信息,只能通过 add 的数字变化感知到报错但无法报错具体初始化错误。有一种更好的办法:指定一个 Executor 发报警,剩下的 Executor 忽略。
A.指定 Executor 报警 (推荐👍)
Spark 可以通过 SparkEnv 上下文变量获取对应 ExecutorId,只需指定一个 Executor 发报警即可
import org.apache.spark.SparkEnv val executorId = SparkEnv.get.executorId
编辑
只需要 if 即可,除此之外,还可以根据 e.getMessage 获取报错相关信息,通过报警函数发送,相比与之前单独起线程的方法,省事了很多。
Tips:
针对单轮次的任务,这样指定问题不大,但是如果是循环执行,由于 Executor 挂掉后会起新的 Executor,且 ExecutorId 是累加的 (例如申请10个 Executor,1号 Executor 挂掉了,这时候会新起一个 11号 Executor,而不是重新启动 1号 Executor),这时候再用 Executor == "1" 判断就有风险了,所以如果任务 Executor 是轮循的且 Executor 有挂掉的风险,建议使用 PartitionId 或者上面复杂的版本,但整体而言这个方法最简单且大部分时候有效。
B.指定 TaskId 报警
上面通过 SparkEnv.get 获取 ExecutorId 进行 if 判断并报警,有些同学一定有疑问,可不可以使用 TaskContext 获取 taskId,指定某个 task 发送报警:
import org.apache.spark.TaskContext val taskId = TaskContext.getPartitionId()
其实也没问题,但是上面的场景不支持,因为使用了 synchronized 关键字,所以初始化时调用的 task 不固定,所以如果 if 指定 taskA 报警,而实际执行 init 的是 taskB,则 taskA 无法感知报错从而发送报警,如果在所有 task 上初始化则没有问题,当然也可以指定一个 PartionId 范围,这样会避免同步锁导致的单一 TaskId 不命中的问题。
编辑
C.在页面查看 ExecutorId
指定 ExecutorId 报警后,如果想到对应 Executor 查看 e.printStackTrace 的异常栈怎么找:
编辑
可以打开 spark 日志界面,前面第一列即为 ExecutorId,如果指定 if (executorId == "1") 且 init 阶段报错,接到告警信息后即可第一时间查看 executorId = 1 的 stderr 日志定位 init 初始化阶段的问题,非常的便捷。
四.总结
如何在 Executor 端进行单独初始化 M -> 1,以及如何将 Task 端的错误 MxN -> 1 发送单次报警大致就这么多内容,这里还有一个问题需要解释下,为什么 SelfObject 不在 Dirver 端直接 init 并广播,这样不就直接保证了初始化1次和异常报警1次的需求:这里因为一些 initValue 是官方定义的 class 且不支持序列化,例如 Tensorflow 的 SavedModelBundle ,针对这样的变量如果在 Driver 端初始化并广播则会报 class 无法序列化的问题,所以才需要将 initValue 从 Dirver 初始化挪到 Executor 初始化。同时也有一些反思,面对上述问题,第一时间想到的是 dirver 与 executor 通信并起线程监控的方法,把简单问题复杂化的同时也引入了不必要的资源和时间浪费,还是要透过现象看本质,针对主要矛盾下手,这样可以快速找到最简单可靠的方法。