4.4 线程不足循环等待问题
如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败。
如果一个大的DAG中嵌套了很多子流程,如下图则会产生“死等”状态:
上图中MainFlowThread
等待SubFlowThread1
结束,SubFlowThread1
等待SubFlowThread2
结束, SubFlowThread2
等待SubFlowThread3
结束,而SubFlowThread3
等待线程池有新线程,则整个DAG
流程不能结束,从而其中的线程也不能释放。这样就形成的子父流程循环等待的状态。此时除非启动新的Master
来增加线程来打破这样的”僵局”,否则调度集群将不能再使用。
对于启动新Master来打破僵局,似乎有点差强人意,于是我们提出了以下三种方案来降低这种风险:
- 计算所有Master的线程总和,然后对每一个DAG需要计算其需要的线程数,也就是在DAG流程执行之前做预计算。因为是多Master线程池,所以总线程数不太可能实时获取。
- 对单Master线程池进行判断,如果线程池已经满了,则让线程直接失败。
- 增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起。这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行。
注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的。
于是我们选择了第三种方式来解决线程不足的问题。
4.5 容错设计
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
4.5.1 宕机容错
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
Master容错流程图:
ZooKeeper Master容错完成之后则重新由EasyScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
Worker容错流程图:
Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。
注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
4.5.2 任务失败重试
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
- 任务失败重试:是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
- 流程失败恢复:是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
- 流程失败重跑:也是流程级别的,是手动进行的,重跑是从开始节点进行
我们将工作流中的任务节点分了两种类型:
- 业务节点:这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
- 逻辑节点:这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作
5. 任务优先级设计
按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。
具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息
保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务。
其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST
、HIGH
、MEDIUM
、LOW
、LOWEST
。如下图:
任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
6. Logback和gRPC实现日志访问
由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:
- 将日志放到ES搜索引擎上
- 通过gRPC通信获取远程日志信息
介于考虑到尽可能的EasyScheduler
的轻量级性,所以选择了gRPC
实现远程访问日志信息。
我们使用自定义Logback
的FileAppender
和Filter
功能,实现每个任务实例生成一个日志文件。
FileAppender主要实现如下:
/** * task log appender */ public class TaskLogAppender extends FileAppender<ILoggingEvent> { ... @Override protected void append(ILoggingEvent event) { if (currentlyActiveFile == null){ currentlyActiveFile = getFile(); } String activeFile = currentlyActiveFile; // thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId String threadName = event.getThreadName(); String[] threadNameArr = threadName.split("-"); // logId = processDefineId_processInstanceId_taskInstanceId String logId = threadNameArr[1]; ... super.subAppend(event); } }
以/流程定义id/流程实例id/任务实例id.log
的形式生成日志,过滤匹配以TaskLogInfo开始的线程名称:
TaskLogFilter实现如下:
/** * task log filter */ public class TaskLogFilter extends Filter<ILoggingEvent> { @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith("TaskLogInfo-")){ return FilterReply.ACCEPT; } return FilterReply.DENY; } }