01 引言
Apache DolphinScheduler官方文档地址:https://dolphinscheduler.apache.org/zh-cn/index.html
在前面的博客,已经讲解了DolphinScheduler相关的概念,有兴趣的同学可以参考下:
- 《DolphinScheduler教程(01)- 入门》
- 《DolphinScheduler教程(02)- 系统架构设计》
Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
其原理图如下:
接下来,本文一步一步详细地讲解其源码。
02 DolphinScheduler 项目结构
2.1 结构分析
首先使用IDEA
从Github clone
项目到本地,项目地址:https://github.com/apache/dolphinscheduler
导入项目后,可以看到其主要核心模块如下:
2.2 表分析
在项目/dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql目录下,有数据库初始化脚本dolphinscheduler_ddl.sql及dolphinscheduler_dml.sql脚本以及一些升级脚本:
执行完后,可以在数据库里看到有如下表:
核心表可以直接看文末附录。
2.2.1 类关系图 (用户/队列/数据源)
描述如下:
- 一个租户下可以有多个用户;
- t_ds_user中的queue字段存储的是队列表中的queue_name信息;
- t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列;
- t_ds_datasource表中的user_id字段表示创建该数据源的用户;
- t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户。
2.2.2 类关系图 (项目/资源/告警)
描述如下:
- 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定;
- t_ds_projcet表中的user_id表示创建该项目的用户;
- t_ds_relation_project_user表中的user_id表示对项目有权限的用户;
- t_ds_resources表中的user_id表示创建该资源的用户;
- t_ds_relation_resources_user中的user_id表示对资源有权限的用户;
- t_ds_udfs表中的user_id表示创建该UDF的用户;
- t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户。
2.2.3 类关系图 ( 命令/流程/任务)
描述如下:
- 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例;
- t_ds_schedulers表存放流程定义的定时调度信息;
- t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表
03 DolphinScheduler 源码分析
讲解源码前,先贴一份官网的启动流程图:
3.1 ExecutorController
根据上述的流程图,可以看到,我们在UI层点击“启动按钮”后,会访问dophinscheduler-api目录的Api Server服务,会进入到ExecutorController(完整类路径:org.apache.dolphinscheduler.api.controller.ExecutorController),下面看看ExecutorController提供了哪些接口方法?
以下是对各接口的描述:
接下我们看看最核心的execute
方法:
/** * do action to process instance: pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user * @param projectCode project code * @param processInstanceId process instance id * @param executeType execute type * @return execute result code */ @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType); return returnDataList(result); }
可以看到execute接口,是直接使用ExecService去执行了,下面分析下。
3.2 ExecService
下面看看里面的execute方法,已经加好了注释:
/** * 操作工作流实例 * * @param loginUser 登录用户 * @param projectCode 项目编码 * @param processInstanceId 流程实例ID * @param executeType 执行类型(repeat running、resume pause、resume failure、stop、pause) * @return 执行结果 */ @Override public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) { /*** 查询项目信息 **/ Project project = projectMapper.queryByCode(projectCode); //check user access for project /*** 判断当前用户是否有操作权限 **/ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType)); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } /*** 检查Master节点是否存在 **/ if (!checkMasterExists(result)) { return result; } /*** 查询工作流实例详情 **/ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); if (processInstance == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } /*** 根据工作流实例绑定的流程定义ID查询流程定义 **/ ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { /*** 校验工作流定义能否执行(工作流是否存在?是否上线状态?存在子工作流定义不是上线状态?) **/ result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } } /*** 根据当前工作流实例的状态判断能否执行对应executeType类型的操作 **/ result = checkExecuteType(processInstance, executeType); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } /*** 判断是否已经选择了合适的租户 **/ if (!checkTenantSuitable(processDefinition)) { logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); } /*** 在executeType为重跑的状态下,获取用户指定的启动参数 **/ Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() { }); String startParams = null; if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS); if (startParamsJson != null) { startParams = startParamsJson.toString(); } } /*** 根据不同的ExecuteType去执行相应的操作 **/ switch (executeType) { case REPEAT_RUNNING: // 重跑 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: // 恢复挂载的工作流 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: // 启动失败的工作流 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: // 停止 if (processInstance.getState() == ExecutionStatus.READY_STOP) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); } break; case PAUSE: // 暂停 if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); } break; default: logger.error("unknown execute type : {}", executeType); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); break; } return result; }
可以看到,以上代码前半部分主要是做了校验的操作,后半部分是根据执行类型来做不同的操作,操作主要分为两部分:insertCommand以及updateProcessInstancePrepare。
3.2.1 insertCommand
方法代码如下,其实主要就是把生成命令并插入t_ds_command(执行命令表),插入已经添加好注释:
/** * 插入命令(re run, recovery (pause / failure) execution) * * @param loginUser 登录用户 * @param instanceId 工作流实例id * @param processDefinitionCode 工作流定义id * @param processVersion 工作流版本 * @param commandType 命令类型 * @return 操作结果 */ private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { Map<String, Object> result = new HashMap<>(); /*** 封装启动参数 **/ Map<String, Object> cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId); if (!StringUtils.isEmpty(startParams)) { cmdParam.put(CMD_PARAM_START_PARAMS, startParams); } Command command = new Command(); command.setCommandType(commandType); command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); command.setProcessDefinitionVersion(processVersion); command.setProcessInstanceId(instanceId); /*** 判断工作流实例是否正在执行 **/ if (!processService.verifyIsNeedCreateCommand(command)) { putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode)); return result; } /*** 保存命令 **/ int create = processService.createCommand(command); if (create > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } return result; }
3.2.2 updateProcessInstancePrepare
方法代码如下,已经添加注释
/** * 准备更新工作流实例的命令类型和状态 * * @param processInstance 工作流实例 * @param commandType 命令类型 * @param executionStatus 执行状态 * @return 更新结果 */ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { Map<String, Object> result = new HashMap<>(); processInstance.setCommandType(commandType); processInstance.addHistoryCmd(commandType); processInstance.setState(executionStatus); int update = processService.updateProcessInstance(processInstance); // 判断流程是否正常 if (update > 0) { StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 ); Host host = new Host(processInstance.getHost()); stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } return result; }
根据流程图,我们可以看到了已经执行了如下红框的代码,也就是把我们的command已经缓存到了DB。接下来需要看看Master的代码。
3.3 MasterServer
MasterSerer在项目/dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java目录。它对应的是架构图的这一块:
代码及注释如下
@SpringBootApplication @ComponentScan("org.apache.dolphinscheduler") @EnableTransactionManagement @EnableCaching public class MasterServer implements IStoppable { private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); @Autowired private SpringApplicationContext springApplicationContext; @Autowired private MasterRegistryClient masterRegistryClient; @Autowired private TaskPluginManager taskPluginManager; @Autowired private MasterSchedulerService masterSchedulerService; @Autowired private SchedulerApi schedulerApi; @Autowired private EventExecuteService eventExecuteService; @Autowired private FailoverExecuteThread failoverExecuteThread; @Autowired private MasterRPCServer masterRPCServer; public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); } /** * 启动 master server */ @PostConstruct public void run() throws SchedulerException { // 初始化 RPC服务 this.masterRPCServer.start(); //安装任务插件 this.taskPluginManager.installPlugin(); /*** MasterServer 注册客户端,用于连接到注册表并传递注册表事件。 * 当主节点启动时,它将在注册中心注册,并调度一个{@link HeartBeatTask}来更新注册表中的元数据**/ this.masterRegistryClient.init(); this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); // 主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。 this.masterSchedulerService.init(); this.masterSchedulerService.start(); this.eventExecuteService.start(); this.failoverExecuteThread.start(); //这是调度器的接口,包含操作调度任务的方法。 this.schedulerApi.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("MasterServer shutdownHook"); } })); } /** * 优雅的关闭方法 * * @param cause 关闭的原因 */ public void close(String cause) { try { // set stop signal is true // execute only once if (!Stopper.stop()) { logger.warn("MasterServer is already stopped, current cause: {}", cause); return; } logger.info("Master server is stopping, current cause : {}", cause); // thread sleep 3 seconds for thread quietly stop ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); // close this.schedulerApi.close(); this.masterSchedulerService.close(); this.masterRPCServer.close(); this.masterRegistryClient.closeRegistry(); // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc springApplicationContext.close(); logger.info("MasterServer stopped, current cause: {}", cause); } catch (Exception e) { logger.error("MasterServer stop failed, current cause: {}", cause, e); } } @Override public void stop(String cause) { close(cause); } }
在run方法里面,可以看到,主要依次执行了:
- ① MasterRPCServer.start():启动master的rpc服务;
- ② TaskPluginManager.installPlugin():安装任务插件;
- ③ MasterRegistryClient.start():向Zookeeper注册MasterServer;
- ④ MasterSchedulerService.start():主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
- ⑤ EventExecuteService.start():工作流实例执行情况
- ⑥ FailoverExecuteThread():故障转移检测
- ⑦ SchedulerApi.start():scheduler接口去操作任务实例
3.3.1 MasterRPCServer
Master RPC Server主要用来发送或接收请求给其它系统。
初始化方法如下:
@PostConstruct private void init() { // 初始化远程服务 NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); // 日志服务 this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.start(); }
3.3.2 TaskPluginManager
插件安装管理器,通过SPI机制加载数据源插件的,其中插件在如下目录:
只要每个数据源插件实现指定的工厂即可,如Spark:
核心代码如下,并通过loadTaskChannel ()
方法,把factory
缓存进Map
集合:
3.3.3 MasterRegistryClient
我们知道DolphinScheduler使用的是去中心化思想,所以MasterRegistryClient主要的作用是注册MasterServer客户端,用于连接到注册表并传递注册表事件。当Master节点启动时,它将在注册中心注册,并调度一个HeartBeatTask来更新注册表中的元数据。
进入RegistryClient里面的subscribe方法,可以看到有两种注册中心,一种是MySQL的,另一种是Zookeeper的,这里应该默认使用了ZookeeperRegistry。
3.3.4 MasterSchedulerService
其init和run方法如下,init主要就是初始化一个工作流实例的队列:
看看里面实际运行的run方法,可以看到里面是一个死循环,不断地去执行scheduleWorkflow() 方法:
看看里面的scheduleWorkflow()方法,已写好注释:
/** * 从数据库中按槽位查询命令,转换为工作流实例,然后提交给workflowExecuteThreadPool。 */ private void scheduleWorkflow() throws InterruptedException, MasterException { // 从数据库中按槽位查询命令 List<Command> commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } // 转换为工作流实例 List<ProcessInstance> processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { // indicate that the command transform to processInstance error, sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } MasterServerMetrics.incMasterConsumeCommand(commands.size()); for (ProcessInstance processInstance : processInstances) { //提交给workflowExecuteThreadPool submitProcessInstance(processInstance); } }
提交工作流实例方法如下,注意提交到了workflowExecuteThreadPool:
/** * 提交工作流实例给 workflowExecuteThreadPool * * @param processInstance 工作流实例 */ private void submitProcessInstance(@NonNull ProcessInstance processInstance) { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); logger.info("Master schedule service starting workflow instance"); // 封装工作流实例Runnable final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager , processAlertManager , masterConfig , stateWheelExecuteThread , curingGlobalParamsService); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } ProcessInstanceMetrics.incProcessInstanceSubmit(); // 提交封装好的工作流实例Runnable给workflowExecuteThreadPool CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync( workflowExecuteRunnable::call, workflowExecuteThreadPool); workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { // submit failed processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); submitFailedProcessInstances.add(processInstance); } }); logger.info("Master schedule service started workflow instance"); } catch (Exception ex) { processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }
3.3.5 EventExecuteService
EventExecuteService没有类注释,猜测应该是用来传递每个工作流实例执行情况事件的。
3.3.6 FailoverExecuteThread
FailoverExecuteThread为故障转移检测线程。每隔10秒检测一次,核心方法在FailoverService
里的failoverMasterWithLock()
及failoverMaster()
方法:
3.3.7 SchedulerApi
SchedulerApi是操作调度任务实例的接口,其主要功能是启动调度程序、插入或更新调度任务、删除调度任务、关闭调度任务和释放资源。
3.3.8 TaskPriorityQueueConsumer
根据流程图,我们可以知道TaskConsumer从队列里获取分割的任务,然后转发到Worker执行。
我们看看TaskPriorityQueueConsumer
里的核心代码,可以看到里面的批量分发任务方法:
进入batchDispatch方法,可以看到从队列里获取任务分发任务:
最后分发任务给worker:
ok,到这里,我们可以看worker部分的代码了。
3.4 WorkerServer
MasterSerer在项目/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java目录。它的主程序代码与MasterServer基本一致,代码及注释如下:
@PostConstruct public void run() { // worker rpc服务 this.workerRpcServer.start(); // 任务插件安装 this.taskPluginManager.installPlugin(); // 向Zookeeper注册客户端 this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); // 管理Worker线程 this.workerManagerThread.start(); // 报告状态线程 this.retryReportTaskStatusThread.start(); /* * registry hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("WorkerServer shutdown hook"); } })); }
根据官网提供的流程图,我们需要重点留意的是TaskExecutePorcessor:
3.4.1 TaskExecutePorcessor
TaskExecuteProcessor在 /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java目录,毋庸置疑,其核心方法为:
@Counted(value = "ds.task.execution.count", description = "task execute total count") @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override public void process(Channel channel, Command command) { // code ... }
其中里面有一段较为核心的代码,提交任务TaskExecuteThread
给管理者:
接下来看看TaskExecuteThread的代码。
3.4.2 TaskExecuteThread
TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:
@Override public void run() { // dry run 预演模式 if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); return; } try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); logger.info("script path : {}", taskExecutionContext.getExecutePath()); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); //回调任务执行状态 taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext); // 拷贝 hdfs/minio 文件到本地 List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); if (!fileDownloads.isEmpty()) { downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); } taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskLogName(taskLogName); // 给当前线程设置名称 Thread.currentThread().setName(taskLogName); task = taskChannel.createTask(taskExecutionContext); // 执行任务插件方法 - init this.task.init(); //init varPool this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); // 执行任务插件方法 - handle this.task.handle(); // 判断是否需要发送告警 if (this.task.getNeedAlert()) { sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode()); } taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode())); taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { logger.error("task scheduler failure", e); kill(); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); clearTaskExecPath(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } }
04 附录
4.1 核心表
① t_ds_process_definition(流程定义表):
② t_ds_process_instance(流程实例表):
③ t_ds_task_instance(任务实例表):
④ t_ds_schedules(流程定时调度表):
⑤ t_ds_command(执行命令表):
05 文末
本文是个人阅读DolphinScheduler
一些见解,后续或许会再更新,DolphinScheduler
给我的感觉就是联通性不强,需要结合流程图才能去理解。最后在补上它的流程图,以做回顾吧
接下来的文章讲讲它的配置。