3.2 ExecService
下面看看里面的execute方法,已经加好了注释:
/** * 操作工作流实例 * * @param loginUser 登录用户 * @param projectCode 项目编码 * @param processInstanceId 流程实例ID * @param executeType 执行类型(repeat running、resume pause、resume failure、stop、pause) * @return 执行结果 */ @Override public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) { /*** 查询项目信息 **/ Project project = projectMapper.queryByCode(projectCode); //check user access for project /*** 判断当前用户是否有操作权限 **/ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType)); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } /*** 检查Master节点是否存在 **/ if (!checkMasterExists(result)) { return result; } /*** 查询工作流实例详情 **/ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); if (processInstance == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } /*** 根据工作流实例绑定的流程定义ID查询流程定义 **/ ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { /*** 校验工作流定义能否执行(工作流是否存在?是否上线状态?存在子工作流定义不是上线状态?) **/ result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } } /*** 根据当前工作流实例的状态判断能否执行对应executeType类型的操作 **/ result = checkExecuteType(processInstance, executeType); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } /*** 判断是否已经选择了合适的租户 **/ if (!checkTenantSuitable(processDefinition)) { logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); } /*** 在executeType为重跑的状态下,获取用户指定的启动参数 **/ Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() { }); String startParams = null; if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS); if (startParamsJson != null) { startParams = startParamsJson.toString(); } } /*** 根据不同的ExecuteType去执行相应的操作 **/ switch (executeType) { case REPEAT_RUNNING: // 重跑 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: // 恢复挂载的工作流 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: // 启动失败的工作流 result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: // 停止 if (processInstance.getState() == ExecutionStatus.READY_STOP) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); } break; case PAUSE: // 暂停 if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); } break; default: logger.error("unknown execute type : {}", executeType); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); break; } return result; }
可以看到,以上代码前半部分主要是做了校验的操作,后半部分是根据执行类型来做不同的操作,操作主要分为两部分:insertCommand以及updateProcessInstancePrepare。
3.2.1 insertCommand
方法代码如下,其实主要就是把生成命令并插入t_ds_command(执行命令表),插入已经添加好注释:
/** * 插入命令(re run, recovery (pause / failure) execution) * * @param loginUser 登录用户 * @param instanceId 工作流实例id * @param processDefinitionCode 工作流定义id * @param processVersion 工作流版本 * @param commandType 命令类型 * @return 操作结果 */ private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { Map<String, Object> result = new HashMap<>(); /*** 封装启动参数 **/ Map<String, Object> cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId); if (!StringUtils.isEmpty(startParams)) { cmdParam.put(CMD_PARAM_START_PARAMS, startParams); } Command command = new Command(); command.setCommandType(commandType); command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); command.setProcessDefinitionVersion(processVersion); command.setProcessInstanceId(instanceId); /*** 判断工作流实例是否正在执行 **/ if (!processService.verifyIsNeedCreateCommand(command)) { putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode)); return result; } /*** 保存命令 **/ int create = processService.createCommand(command); if (create > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } return result; }
3.2.2 updateProcessInstancePrepare
方法代码如下,已经添加注释
/** * 准备更新工作流实例的命令类型和状态 * * @param processInstance 工作流实例 * @param commandType 命令类型 * @param executionStatus 执行状态 * @return 更新结果 */ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { Map<String, Object> result = new HashMap<>(); processInstance.setCommandType(commandType); processInstance.addHistoryCmd(commandType); processInstance.setState(executionStatus); int update = processService.updateProcessInstance(processInstance); // 判断流程是否正常 if (update > 0) { StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 ); Host host = new Host(processInstance.getHost()); stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } return result; }
根据流程图,我们可以看到了已经执行了如下红框的代码,也就是把我们的command已经缓存到了DB。接下来需要看看Master的代码。
3.3 MasterServer
MasterSerer在项目/dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java目录。它对应的是架构图的这一块:
代码及注释如下:
@SpringBootApplication @ComponentScan("org.apache.dolphinscheduler") @EnableTransactionManagement @EnableCaching public class MasterServer implements IStoppable { private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); @Autowired private SpringApplicationContext springApplicationContext; @Autowired private MasterRegistryClient masterRegistryClient; @Autowired private TaskPluginManager taskPluginManager; @Autowired private MasterSchedulerService masterSchedulerService; @Autowired private SchedulerApi schedulerApi; @Autowired private EventExecuteService eventExecuteService; @Autowired private FailoverExecuteThread failoverExecuteThread; @Autowired private MasterRPCServer masterRPCServer; public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); } /** * 启动 master server */ @PostConstruct public void run() throws SchedulerException { // 初始化 RPC服务 this.masterRPCServer.start(); //安装任务插件 this.taskPluginManager.installPlugin(); /*** MasterServer 注册客户端,用于连接到注册表并传递注册表事件。 * 当主节点启动时,它将在注册中心注册,并调度一个{@link HeartBeatTask}来更新注册表中的元数据**/ this.masterRegistryClient.init(); this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); // 主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。 this.masterSchedulerService.init(); this.masterSchedulerService.start(); this.eventExecuteService.start(); this.failoverExecuteThread.start(); //这是调度器的接口,包含操作调度任务的方法。 this.schedulerApi.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("MasterServer shutdownHook"); } })); } /** * 优雅的关闭方法 * * @param cause 关闭的原因 */ public void close(String cause) { try { // set stop signal is true // execute only once if (!Stopper.stop()) { logger.warn("MasterServer is already stopped, current cause: {}", cause); return; } logger.info("Master server is stopping, current cause : {}", cause); // thread sleep 3 seconds for thread quietly stop ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); // close this.schedulerApi.close(); this.masterSchedulerService.close(); this.masterRPCServer.close(); this.masterRegistryClient.closeRegistry(); // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc springApplicationContext.close(); logger.info("MasterServer stopped, current cause: {}", cause); } catch (Exception e) { logger.error("MasterServer stop failed, current cause: {}", cause, e); } } @Override public void stop(String cause) { close(cause); } }
在run方法里面,可以看到,主要依次执行了:
- ① MasterRPCServer.start():启动master的rpc服务;
- ② TaskPluginManager.installPlugin():安装任务插件;
- ③ MasterRegistryClient.start():向Zookeeper注册MasterServer;
- ④ MasterSchedulerService.start():主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
- ⑤ EventExecuteService.start():工作流实例执行情况
- ⑥ FailoverExecuteThread():故障转移检测
- ⑦ SchedulerApi.start():scheduler接口去操作任务实例
3.3.1 MasterRPCServer
Master RPC Server主要用来发送或接收请求给其它系统。
初始化方法如下:
@PostConstruct private void init() { // 初始化远程服务 NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); // 日志服务 this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.start(); }
3.3.2 TaskPluginManager
插件安装管理器,通过SPI机制加载数据源插件的,其中插件在如下目录:
只要每个数据源插件实现指定的工厂即可,如Spark:
核心代码如下,并通过loadTaskChannel ()方法,把factory缓存进Map<String, TaskChannel>集合:
3.3.3 MasterRegistryClient
我们知道DolphinScheduler使用的是去中心化思想,所以MasterRegistryClient主要的作用是注册MasterServer客户端,用于连接到注册表并传递注册表事件。当Master节点启动时,它将在注册中心注册,并调度一个HeartBeatTask来更新注册表中的元数据。
进入RegistryClient里面的subscribe方法,可以看到有两种注册中心,一种是MySQL的,另一种是Zookeeper的,这里应该默认使用了ZookeeperRegistry。
3.3.4 MasterSchedulerService
其init和run方法如下,init主要就是初始化一个工作流实例的队列:
看看里面实际运行的run方法,可以看到里面是一个死循环,不断地去执行scheduleWorkflow() 方法:
看看里面的scheduleWorkflow()方法,已写好注释:
/** * 从数据库中按槽位查询命令,转换为工作流实例,然后提交给workflowExecuteThreadPool。 */ private void scheduleWorkflow() throws InterruptedException, MasterException { // 从数据库中按槽位查询命令 List<Command> commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } // 转换为工作流实例 List<ProcessInstance> processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { // indicate that the command transform to processInstance error, sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } MasterServerMetrics.incMasterConsumeCommand(commands.size()); for (ProcessInstance processInstance : processInstances) { //提交给workflowExecuteThreadPool submitProcessInstance(processInstance); } }
提交工作流实例方法如下,注意提交到了workflowExecuteThreadPool:
/** * 提交工作流实例给 workflowExecuteThreadPool * * @param processInstance 工作流实例 */ private void submitProcessInstance(@NonNull ProcessInstance processInstance) { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); logger.info("Master schedule service starting workflow instance"); // 封装工作流实例Runnable final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager , processAlertManager , masterConfig , stateWheelExecuteThread , curingGlobalParamsService); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } ProcessInstanceMetrics.incProcessInstanceSubmit(); // 提交封装好的工作流实例Runnable给workflowExecuteThreadPool CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync( workflowExecuteRunnable::call, workflowExecuteThreadPool); workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { // submit failed processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); submitFailedProcessInstances.add(processInstance); } }); logger.info("Master schedule service started workflow instance"); } catch (Exception ex) { processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } }
3.3.5 EventExecuteService
EventExecuteService没有类注释,猜测应该是用来传递每个工作流实例执行情况事件的。
3.3.6 FailoverExecuteThread
FailoverExecuteThread为故障转移检测线程。每隔10秒检测一次,核心方法在FailoverService里的failoverMasterWithLock()及failoverMaster()方法:
3.3.7 SchedulerApi
SchedulerApi是操作调度任务实例的接口,其主要功能是启动调度程序、插入或更新调度任务、删除调度任务、关闭调度任务和释放资源。










