DolphinScheduler教程(02)- 系统架构设计(下)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: DolphinScheduler教程(02)- 系统架构设计(下)

4.4 线程不足循环等待问题

如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败

如果一个大的DAG中嵌套了很多子流程,如下图则会产生“死等”状态:

上图中MainFlowThread等待SubFlowThread1结束,SubFlowThread1等待SubFlowThread2结束, SubFlowThread2等待SubFlowThread3结束,而SubFlowThread3等待线程池有新线程,则整个DAG流程不能结束,从而其中的线程也不能释放。这样就形成的子父流程循环等待的状态。此时除非启动新的Master来增加线程来打破这样的”僵局”,否则调度集群将不能再使用。

对于启动新Master来打破僵局,似乎有点差强人意,于是我们提出了以下三种方案来降低这种风险:

  1. 计算所有Master的线程总和,然后对每一个DAG需要计算其需要的线程数,也就是在DAG流程执行之前做预计算。因为是多Master线程池,所以总线程数不太可能实时获取。
  2. 对单Master线程池进行判断,如果线程池已经满了,则让线程直接失败。
  3. 增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起。这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行。

注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的。

于是我们选择了第三种方式来解决线程不足的问题。

4.5 容错设计

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况

4.5.1 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

Master容错流程图:

ZooKeeper Master容错完成之后则重新由EasyScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

Worker容错流程图:

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。

注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。

4.5.2 任务失败重试

这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

  • 任务失败重试:是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
  • 流程失败恢复:是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
  • 流程失败重跑:也是流程级别的,是手动进行的,重跑是从开始节点进行

我们将工作流中的任务节点分了两种类型:

  • 业务节点:这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
  • 逻辑节点:这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作

5. 任务优先级设计

按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。

具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务。

其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHESTHIGHMEDIUMLOWLOWEST。如下图:

任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

6. Logback和gRPC实现日志访问

由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:

  • 将日志放到ES搜索引擎上
  • 通过gRPC通信获取远程日志信息

介于考虑到尽可能的EasyScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

我们使用自定义LogbackFileAppenderFilter功能,实现每个任务实例生成一个日志文件。

FileAppender主要实现如下:

/**
 * task log appender
 */
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
    ...
   @Override
   protected void append(ILoggingEvent event) {
       if (currentlyActiveFile == null){
           currentlyActiveFile = getFile();
       }
       String activeFile = currentlyActiveFile;
       // thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
       String threadName = event.getThreadName();
       String[] threadNameArr = threadName.split("-");
       // logId = processDefineId_processInstanceId_taskInstanceId
       String logId = threadNameArr[1];
       ...
       super.subAppend(event);
   }
}

/流程定义id/流程实例id/任务实例id.log的形式生成日志,过滤匹配以TaskLogInfo开始的线程名称

TaskLogFilter实现如下:

/**
*  task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent> {
   @Override
   public FilterReply decide(ILoggingEvent event) {
       if (event.getThreadName().startsWith("TaskLogInfo-")){
           return FilterReply.ACCEPT;
       }
       return FilterReply.DENY;
   }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
打赏
0
0
0
0
237
分享
相关文章
Kotlin教程笔记 - MVP与MVVM架构设计的对比
Kotlin教程笔记 - MVP与MVVM架构设计的对比
112 4
Kotlin教程笔记 - 适合构建中大型项目的架构模式全面对比
Kotlin教程笔记 - 适合构建中大型项目的架构模式全面对比
54 3
Kotlin教程笔记 - MVVM架构怎样避免内存泄漏
Kotlin教程笔记 - MVVM架构怎样避免内存泄漏
46 2
【上云基础系列 02-01】通过SLB+1台ECS+ESS弹性伸缩,搭建一个精简版的上云标准弹性架构(含方案及教程)
通常,构建一个弹性架构(即使是一个最基础的入门版),至少需要2台ECS。但是,很多小微企业刚开始上云的时候,为了节省成本不愿意购买更多的服务器。通过 “ALB+ESS弹性伸缩+1台ECS+RDS”方案,在保障低成本的同时,也不牺牲业务架构的弹性设计,更避免了很多人因为节省成本选择了单体架构后频繁改造架构的困局。 方案中的几个设计非常值得小微企业借鉴:(1)通过ALB/RDS的按量付费,节省了初期流量不大时的费用;(2)通过ESS弹性伸缩,不需要提前购买服务器资源,但是当业务增长或减少时却保持了资源弹性自动扩缩容。
PolarDB 开源基础教程系列 1 架构解读
PolarDB 是阿里云研发的云原生分布式数据库,基于 PostgreSQL 开源版本,旨在解决传统数据库在大规模数据和高并发场景下的性能和扩展性问题。其主要特点包括: 1. **存储计算分离架构**:通过将计算与存储分离,实现极致弹性、共享一份数据以降低成本、透明读写分离。 2. **HTAP 架构**:支持混合事务处理和分析处理(HTAP),能够在同一系统中高效执行 OLTP 和 OLAP 查询。 3. **优化的日志复制机制**:采用只复制元数据的方式减少网络传输量,优化页面回放和 DDL 锁回放过程。 4. **并行查询与索引创建**:引入 MPP 分布式执行引擎。
32 7
Kotlin教程笔记(80) - MVVM架构设计
Kotlin教程笔记(80) - MVVM架构设计
45 1
Kotlin教程笔记 - MVVM架构怎样避免内存泄漏
Kotlin教程笔记 - MVVM架构怎样避免内存泄漏

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等