01 引言
Apache DolphinScheduler官方文档地址:https://dolphinscheduler.apache.org/zh-cn/index.html
在前面的博客,已经讲解了DolphinScheduler相关的概念,有兴趣的同学可以参考下:
Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
其原理图如下:
接下来,本文一步一步详细地讲解其源码。
02 DolphinScheduler 项目结构
2.1 结构分析
首先使用IDEA
从Github clone
项目到本地,项目地址:https://github.com/apache/dolphinscheduler
导入项目后,可以看到其主要核心模块如下:
模块 | 描述 |
dolphinscheduler-alert | 告警模块,提供 AlertServer 服务。 |
dolphinscheduler-api | web应用模块,提供 ApiServer 服务。 |
dolphinscheduler-common | 通用的常量枚举、工具类、数据结构或者基类 |
dolphinscheduler-dao | 提供数据库访问等操作。 |
dolphinscheduler-remote | 基于 netty 的客户端、服务端 |
dolphinscheduler-server | MasterServer 和 WorkerServer 服务 |
dolphinscheduler-service | service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 |
dolphinscheduler-ui | 前端模块 |
2.2 表分析
在项目/dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql
目录下,有数据库初始化脚本dolphinscheduler_ddl.sql及dolphinscheduler_dml.sql脚本以及一些升级脚本:
执行完后,可以在数据库里看到有如下表:
表名 | 表信息 |
t_ds_access_token | 访问ds后端的token |
t_ds_alert | 告警信息 |
t_ds_alertgroup | 告警组 |
t_ds_command | 执行命令 |
t_ds_datasource | 数据源 |
t_ds_error_command(核心表) | 错误命令 |
t_ds_process_definition(核心表) | 流程定义 |
t_ds_process_instance(核心表) | 流程实例 |
t_ds_project | 项目 |
t_ds_queue | 队列 |
t_ds_relation_datasource_user | 用户关联数据源 |
t_ds_relation_process_instance | 子流程 |
t_ds_relation_project_user | 用户关联项目 |
t_ds_relation_resources_user | 用户关联资源 |
t_ds_relation_udfs_user | 用户关联UDF函数 |
t_ds_relation_user_alertgroup | 用户关联告警组 |
t_ds_resources | 资源文件 |
t_ds_schedules(核心表) | 流程定时调度 |
t_ds_session | 用户登录的session |
t_ds_task_instance(核心表) | 任务实例 |
t_ds_tenant | 租户 |
t_ds_udfs | UDF资源 |
t_ds_user | 用户 |
t_ds_version | ds版本信息 |
核心表可以直接看文末附录。
2.2.1 类关系图 (用户/队列/数据源)
描述如下:
- 一个租户下可以有多个用户;
t_ds_user
中的queue
字段存储的是队列表中的queue_name
信息;t_ds_tenant
下存的是queue_id
,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列;t_ds_datasource
表中的user_id
字段表示创建该数据源的用户;t_ds_relation_datasource_user
中的user_id
表示,对数据源有权限的用户。
2.2.2 类关系图 (项目/资源/告警)
描述如下:
- 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定;
- t_ds_projcet表中的user_id表示创建该项目的用户;
- t_ds_relation_project_user表中的user_id表示对项目有权限的用户;
- t_ds_resources表中的user_id表示创建该资源的用户;
- t_ds_relation_resources_user中的user_id表示对资源有权限的用户;
- t_ds_udfs表中的user_id表示创建该UDF的用户;
- t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户。
2.2.3 类关系图 ( 命令/流程/任务)
描述如下:
- 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例;
- t_ds_schedulers表存放流程定义的定时调度信息;
- t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表
03 DolphinScheduler 源码分析
讲解源码前,先贴一份官网的启动流程图:
3.1 ExecutorController
根据上述的流程图,可以看到,我们在UI层点击“启动按钮”后,会访问dophinscheduler-api
目录的Api Server
服务,会进入到ExecutorController
(完整类路径:org.apache.dolphinscheduler.api.controller.ExecutorController),下面看看ExecutorController
提供了哪些接口方法?
以下是对各接口的描述:
接口 | 描述 |
/start-process-instance | 执行流程实例 |
/batch-start-process-instance | 批量执行流程实例 |
/execute | 操作流程实例,如:暂停, 停止, 重跑, 从暂停恢复,从停止恢复 |
/batch-execute | 批量操作流程实例 |
/start-check | 检查流程定义或检查所有的子流程定义是否在线 |
接下我们看看最核心的execute
方法:
/** * do action to process instance: pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */ @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType); return returnDataList(result); }
可以看到execute接口,是直接使用ExecService去执行了,下面分析下。