DolphinScheduler教程(03)- 源码分析

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: DolphinScheduler教程(03)- 源码分析

01 引言


Apache DolphinScheduler官方文档地址:https://dolphinscheduler.apache.org/zh-cn/index.html


在前面的博客,已经讲解了DolphinScheduler相关的概念,有兴趣的同学可以参考下:


  • 《DolphinScheduler教程(01)- 入门》
  • 《DolphinScheduler教程(02)- 系统架构设计》


Apache DolphinScheduler是一个分布式去中心化,易扩展的可视化DAG工作流任务调度平台。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。


其原理图如下:

25b662a8f04248fd9befa85ec7b3aa65.png


接下来,本文一步一步详细地讲解其源码。


02 DolphinScheduler 项目结构


2.1 结构分析


首先使用IDEAGithub clone项目到本地,项目地址https://github.com/apache/dolphinscheduler

image.png



导入项目后,可以看到其主要核心模块如下:

微信截图_20221011204146.png


2.2 表分析


在项目/dolphinscheduler-dao/src/main/resources/sql/create/release-1.0.0_schema/mysql目录下,有数据库初始化脚本dolphinscheduler_ddl.sql及dolphinscheduler_dml.sql脚本以及一些升级脚本:

378e8f352d7c4c40bb82d0349dc69e5e.png


执行完后,可以在数据库里看到有如下表:

微信截图_20221011204314.png


核心表可以直接看文末附录。


2.2.1 类关系图 (用户/队列/数据源)

8254c781decb4476aca6990cc0918363.png


描述如下:

  • 一个租户下可以有多个用户;
  • t_ds_user中的queue字段存储的是队列表中的queue_name信息;
  • t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列;
  • t_ds_datasource表中的user_id字段表示创建该数据源的用户;
  • t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户。


2.2.2 类关系图 (项目/资源/告警)

96e0dca0b8844482b39dbc132e6e11b6.png


描述如下:


  • 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定;
  • t_ds_projcet表中的user_id表示创建该项目的用户;
  • t_ds_relation_project_user表中的user_id表示对项目有权限的用户;
  • t_ds_resources表中的user_id表示创建该资源的用户;
  • t_ds_relation_resources_user中的user_id表示对资源有权限的用户;
  • t_ds_udfs表中的user_id表示创建该UDF的用户;
  • t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户。


2.2.3 类关系图 ( 命令/流程/任务)

f16ef281faa04012acb3f6805c70488f.png


e1b2db2e4a2e44e783745b0725c8ca43.png


描述如下:


  • 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例;
  • t_ds_schedulers表存放流程定义的定时调度信息;
  • t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表

03 DolphinScheduler 源码分析


讲解源码前,先贴一份官网的启动流程图:

3301059bdb22468fb020a8502b9b439a.png

3.1 ExecutorController


根据上述的流程图,可以看到,我们在UI层点击“启动按钮”后,会访问dophinscheduler-api目录的Api Server服务,会进入到ExecutorController(完整类路径:org.apache.dolphinscheduler.api.controller.ExecutorController),下面看看ExecutorController提供了哪些接口方法?

41b6c91cd2944aad86388fff23d6b0a3.png


以下是对各接口的描述:

QQ截图20221011204618.png


接下我们看看最核心的execute方法:

 /**
     * do action to process instance: pause, stop, repeat, recover from pause, recover from stop
     *
     * @param loginUser login user
     * @param projectCode project code
     * @param processInstanceId process instance id
     * @param executeType execute type
     * @return execute result code
     */
    @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
            @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
    })
    @PostMapping(value = "/execute")
    @ResponseStatus(HttpStatus.OK)
    @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
    public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                          @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
                          @RequestParam("processInstanceId") Integer processInstanceId,
                          @RequestParam("executeType") ExecuteType executeType
    ) {
        Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
        return returnDataList(result);
    }


可以看到execute接口,是直接使用ExecService去执行了,下面分析下。


3.2 ExecService


下面看看里面的execute方法,已经加好了注释:

/**
 * 操作工作流实例
 *
 * @param loginUser         登录用户
 * @param projectCode       项目编码
 * @param processInstanceId 流程实例ID
 * @param executeType       执行类型(repeat running、resume pause、resume failure、stop、pause)
 * @return 执行结果
 */
@Override
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType) {
    /*** 查询项目信息 **/
    Project project = projectMapper.queryByCode(projectCode);
    //check user access for project
    /*** 判断当前用户是否有操作权限 **/
    Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType));
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
        return result;
    }
    /*** 检查Master节点是否存在 **/
    if (!checkMasterExists(result)) {
        return result;
    }
    /*** 查询工作流实例详情 **/
    ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
    if (processInstance == null) {
        putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
        return result;
    }
    /*** 根据工作流实例绑定的流程定义ID查询流程定义 **/
    ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
            processInstance.getProcessDefinitionVersion());
    if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
        /*** 校验工作流定义能否执行(工作流是否存在?是否上线状态?存在子工作流定义不是上线状态?) **/
        result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        if (result.get(Constants.STATUS) != Status.SUCCESS) {
            return result;
        }
    }
    /*** 根据当前工作流实例的状态判断能否执行对应executeType类型的操作 **/
    result = checkExecuteType(processInstance, executeType);
    if (result.get(Constants.STATUS) != Status.SUCCESS) {
        return result;
    }
    /*** 判断是否已经选择了合适的租户 **/
    if (!checkTenantSuitable(processDefinition)) {
        logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
                processDefinition.getId(), processDefinition.getName());
        putMsg(result, Status.TENANT_NOT_SUITABLE);
    }
    /*** 在executeType为重跑的状态下,获取用户指定的启动参数 **/
    Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
    });
    String startParams = null;
    if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
        Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
        if (startParamsJson != null) {
            startParams = startParamsJson.toString();
        }
    }
    /*** 根据不同的ExecuteType去执行相应的操作 **/
    switch (executeType) {
        case REPEAT_RUNNING: // 重跑
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
            break;
        case RECOVER_SUSPENDED_PROCESS: // 恢复挂载的工作流
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
            break;
        case START_FAILURE_TASK_PROCESS: // 启动失败的工作流
            result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
            break;
        case STOP: // 停止
            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
                result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
            }
            break;
        case PAUSE: // 暂停
            if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
                putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
            } else {
                result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
            }
            break;
        default:
            logger.error("unknown execute type : {}", executeType);
            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
            break;
    }
    return result;
}


可以看到,以上代码前半部分主要是做了校验的操作,后半部分是根据执行类型来做不同的操作,操作主要分为两部分:insertCommand以及updateProcessInstancePrepare。


3.2.1 insertCommand


方法代码如下,其实主要就是把生成命令并插入t_ds_command(执行命令表),插入已经添加好注释:

/**
 * 插入命令(re run, recovery (pause / failure) execution)
 *
 * @param loginUser             登录用户
 * @param instanceId            工作流实例id
 * @param processDefinitionCode 工作流定义id
 * @param processVersion        工作流版本
 * @param commandType           命令类型
 * @return 操作结果
 */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) {
    Map<String, Object> result = new HashMap<>();
    /*** 封装启动参数 **/
    Map<String, Object> cmdParam = new HashMap<>();
    cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
    if (!StringUtils.isEmpty(startParams)) {
        cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
    }
    Command command = new Command();
    command.setCommandType(commandType);
    command.setProcessDefinitionCode(processDefinitionCode);
    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
    command.setExecutorId(loginUser.getId());
    command.setProcessDefinitionVersion(processVersion);
    command.setProcessInstanceId(instanceId);
    /*** 判断工作流实例是否正在执行 **/
    if (!processService.verifyIsNeedCreateCommand(command)) {
        putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
        return result;
    }
    /*** 保存命令 **/
    int create = processService.createCommand(command);
    if (create > 0) {
        putMsg(result, Status.SUCCESS);
    } else {
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }
    return result;
}


3.2.2 updateProcessInstancePrepare


方法代码如下,已经添加注释

/**
 * 准备更新工作流实例的命令类型和状态
 *
 * @param processInstance 工作流实例
 * @param commandType     命令类型
 * @param executionStatus 执行状态
 * @return 更新结果
 */
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
    Map<String, Object> result = new HashMap<>();
    processInstance.setCommandType(commandType);
    processInstance.addHistoryCmd(commandType);
    processInstance.setState(executionStatus);
    int update = processService.updateProcessInstance(processInstance);
    // 判断流程是否正常
    if (update > 0) {
        StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
                processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
        );
        Host host = new Host(processInstance.getHost());
        stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
        putMsg(result, Status.SUCCESS);
    } else {
        putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
    }
    return result;
}


根据流程图,我们可以看到了已经执行了如下红框的代码,也就是把我们的command已经缓存到了DB。接下来需要看看Master的代码

05397d6a600a4844a48c0e63f659f457.png


3.3 MasterServer


MasterSerer在项目/dolphinscheduler-dev/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java目录。它对应的是架构图的这一块:

c8b664660bbd430d8f6e3988593205eb.png


代码及注释如下

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@EnableCaching
public class MasterServer implements IStoppable {
    private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
    @Autowired
    private SpringApplicationContext springApplicationContext;
    @Autowired
    private MasterRegistryClient masterRegistryClient;
    @Autowired
    private TaskPluginManager taskPluginManager;
    @Autowired
    private MasterSchedulerService masterSchedulerService;
    @Autowired
    private SchedulerApi schedulerApi;
    @Autowired
    private EventExecuteService eventExecuteService;
    @Autowired
    private FailoverExecuteThread failoverExecuteThread;
    @Autowired
    private MasterRPCServer masterRPCServer;
    public static void main(String[] args) {
        Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
        SpringApplication.run(MasterServer.class);
    }
    /**
     * 启动 master server
     */
    @PostConstruct
    public void run() throws SchedulerException {
        // 初始化 RPC服务
        this.masterRPCServer.start();
        //安装任务插件
        this.taskPluginManager.installPlugin();
        /*** MasterServer 注册客户端,用于连接到注册表并传递注册表事件。
         * 当主节点启动时,它将在注册中心注册,并调度一个{@link HeartBeatTask}来更新注册表中的元数据**/
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);
        // 主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
        this.masterSchedulerService.init();
        this.masterSchedulerService.start();
        this.eventExecuteService.start();
        this.failoverExecuteThread.start();
        //这是调度器的接口,包含操作调度任务的方法。
        this.schedulerApi.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("MasterServer shutdownHook");
            }
        }));
    }
    /**
     * 优雅的关闭方法
     *
     * @param cause 关闭的原因
     */
    public void close(String cause) {
        try {
            // set stop signal is true
            // execute only once
            if (!Stopper.stop()) {
                logger.warn("MasterServer is already stopped, current cause: {}", cause);
                return;
            }
            logger.info("Master server is stopping, current cause : {}", cause);
            // thread sleep 3 seconds for thread quietly stop
            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
            // close
            this.schedulerApi.close();
            this.masterSchedulerService.close();
            this.masterRPCServer.close();
            this.masterRegistryClient.closeRegistry();
            // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
            // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
            springApplicationContext.close();
            logger.info("MasterServer stopped, current cause: {}", cause);
        } catch (Exception e) {
            logger.error("MasterServer stop failed, current cause: {}", cause, e);
        }
    }
    @Override
    public void stop(String cause) {
        close(cause);
    }
}


在run方法里面,可以看到,主要依次执行了:


  • ① MasterRPCServer.start():启动master的rpc服务;
  • ② TaskPluginManager.installPlugin():安装任务插件;
  • ③ MasterRegistryClient.start():向Zookeeper注册MasterServer;
  • ④ MasterSchedulerService.start():主调度程序线程,该线程将使用来自数据库的命令并触发执行的processInstance。
  • ⑤ EventExecuteService.start():工作流实例执行情况
  • ⑥ FailoverExecuteThread():故障转移检测
  • ⑦ SchedulerApi.start():scheduler接口去操作任务实例


3.3.1 MasterRPCServer


Master RPC Server主要用来发送或接收请求给其它系统。


初始化方法如下:

@PostConstruct
private void init() {
    // 初始化远程服务
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
    // 日志服务
    this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
    this.nettyRemotingServer.start();
}


3.3.2 TaskPluginManager


插件安装管理器,通过SPI机制加载数据源插件的,其中插件在如下目录:

f2585ff5e84349ddba18df9acdeddc44.png


只要每个数据源插件实现指定的工厂即可,如Spark:

2c35f209195144689d7dbb635fec8d66.png


核心代码如下,并通过loadTaskChannel ()方法,把factory缓存进Map集合:

1af9a92c55ef409e8e3085169e72e5f8.png


3.3.3 MasterRegistryClient


我们知道DolphinScheduler使用的是去中心化思想,所以MasterRegistryClient主要的作用是注册MasterServer客户端,用于连接到注册表并传递注册表事件。当Master节点启动时,它将在注册中心注册,并调度一个HeartBeatTask来更新注册表中的元数据。

b95c685ac835467a810ecbb1d331c708.png


进入RegistryClient里面的subscribe方法,可以看到有两种注册中心,一种是MySQL的,另一种是Zookeeper的,这里应该默认使用了ZookeeperRegistry。

904c8582aabd42b5a190a4350a994aa8.png


3.3.4 MasterSchedulerService


其init和run方法如下,init主要就是初始化一个工作流实例的队列

481ee54c61d3431183907ad0746e36a9.png


看看里面实际运行的run方法,可以看到里面是一个死循环,不断地去执行scheduleWorkflow() 方法:

2d04300d5dd3457aaa696e92bac401a3.png


看看里面的scheduleWorkflow()方法,已写好注释:

/**
 * 从数据库中按槽位查询命令,转换为工作流实例,然后提交给workflowExecuteThreadPool。
 */
private void scheduleWorkflow() throws InterruptedException, MasterException {
    // 从数据库中按槽位查询命令
    List<Command> commands = findCommands();
    if (CollectionUtils.isEmpty(commands)) {
        // indicate that no command ,sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }
    // 转换为工作流实例
    List<ProcessInstance> processInstances = command2ProcessInstance(commands);
    if (CollectionUtils.isEmpty(processInstances)) {
        // indicate that the command transform to processInstance error, sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        return;
    }
    MasterServerMetrics.incMasterConsumeCommand(commands.size());
    for (ProcessInstance processInstance : processInstances) {
        //提交给workflowExecuteThreadPool
        submitProcessInstance(processInstance);
    }
}


提交工作流实例方法如下,注意提交到了workflowExecuteThreadPool

/**
 * 提交工作流实例给 workflowExecuteThreadPool
 *
 * @param processInstance 工作流实例
 */
private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
    try {
        LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
        logger.info("Master schedule service starting workflow instance");
        // 封装工作流实例Runnable
        final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                processInstance
                , processService
                , nettyExecutorManager
                , processAlertManager
                , masterConfig
                , stateWheelExecuteThread
                , curingGlobalParamsService);
        this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
        if (processInstance.getTimeout() > 0) {
            stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
        }
        ProcessInstanceMetrics.incProcessInstanceSubmit();
        // 提交封装好的工作流实例Runnable给workflowExecuteThreadPool
        CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
                workflowExecuteRunnable::call, workflowExecuteThreadPool);
        workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
            if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
                // submit failed
                processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
                stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
                submitFailedProcessInstances.add(processInstance);
            }
        });
        logger.info("Master schedule service started workflow instance");
    } catch (Exception ex) {
        processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
        stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
        logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
    } finally {
        LoggerUtils.removeWorkflowInstanceIdMDC();
    }
}


3.3.5 EventExecuteService


EventExecuteService没有类注释,猜测应该是用来传递每个工作流实例执行情况事件的。

9c3674c89a9f47159dc3bfdafec67214.png


3.3.6 FailoverExecuteThread


FailoverExecuteThread为故障转移检测线程。每隔10秒检测一次,核心方法在FailoverService里的failoverMasterWithLock()failoverMaster()方法:

8cb4e381d8964cbba879ff74276a572b.png

3.3.7 SchedulerApi


SchedulerApi是操作调度任务实例的接口,其主要功能是启动调度程序、插入或更新调度任务、删除调度任务、关闭调度任务和释放资源。


3.3.8 TaskPriorityQueueConsumer


根据流程图,我们可以知道TaskConsumer从队列里获取分割的任务,然后转发到Worker执行。

784d31061f7b40ccba59fe7132e3b9b2.png


我们看看TaskPriorityQueueConsumer里的核心代码,可以看到里面的批量分发任务方法:

162b31af052d4547a9ffe0ab1fb819ce.png


进入batchDispatch方法,可以看到从队列里获取任务分发任务:

image.png


最后分发任务给worker:12b99a2160dc4bb2a64aa631aec866d5.png


ok,到这里,我们可以看worker部分的代码了。


3.4 WorkerServer


MasterSerer在项目/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java目录。它的主程序代码与MasterServer基本一致,代码及注释如下:

@PostConstruct
public void run() {
  // worker rpc服务
    this.workerRpcServer.start();
  // 任务插件安装
    this.taskPluginManager.installPlugin();
    // 向Zookeeper注册客户端
    this.workerRegistryClient.registry();
    this.workerRegistryClient.setRegistryStoppable(this);
    Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
    this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
   // 管理Worker线程
    this.workerManagerThread.start();
    // 报告状态线程
    this.retryReportTaskStatusThread.start();
    /*
     * registry hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("WorkerServer shutdown hook");
        }
    }));
}


根据官网提供的流程图,我们需要重点留意的是TaskExecutePorcessor:

9b40087c3a66409aac5e1d6aa8edfeee.png


3.4.1 TaskExecutePorcessor


TaskExecuteProcessor在 /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java目录,毋庸置疑,其核心方法为:

@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
 // code ...
}


其中里面有一段较为核心的代码,提交任务TaskExecuteThread给管理者:

8dd898faf1bc402b8291ba12c3b1309f.png


接下来看看TaskExecuteThread的代码。


3.4.2 TaskExecuteThread


TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:

@Override
public void run() {
    // dry run 预演模式
    if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setEndTime(new Date());
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
            taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        return;
    }
    try {
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        logger.info("script path : {}", taskExecutionContext.getExecutePath());
        if (taskExecutionContext.getStartTime() == null) {
            taskExecutionContext.setStartTime(new Date());
        }
        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
        //回调任务执行状态
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
        // 拷贝 hdfs/minio 文件到本地
        List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
        if (!fileDownloads.isEmpty()) {
            downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
        }
        taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
        taskExecutionContext.setTaskAppId(String.format("%s_%s",
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId()));
        TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
        if (null == taskChannel) {
            throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
        }
        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);
        // 给当前线程设置名称
        Thread.currentThread().setName(taskLogName);
        task = taskChannel.createTask(taskExecutionContext);
        // 执行任务插件方法 - init
        this.task.init();
        //init varPool
        this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
        // 执行任务插件方法 -  handle
        this.task.handle();
        // 判断是否需要发送告警
        if (this.task.getNeedAlert()) {
            sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
        }
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
        taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
    } catch (Throwable e) {
        logger.error("task scheduler failure", e);
        kill();
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
    } finally {
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        clearTaskExecPath();
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}


04 附录


4.1 核心表


① t_ds_process_definition(流程定义表):

QQ截图20221011210042.png


② t_ds_process_instance(流程实例表):

微信截图_20221011210126.png


③ t_ds_task_instance(任务实例表):

微信截图_20221011210143.png


④ t_ds_schedules(流程定时调度表):

微信截图_20221011210155.png


⑤ t_ds_command(执行命令表):

微信截图_20221011210204.png


05 文末


本文是个人阅读DolphinScheduler一些见解,后续或许会再更新,DolphinScheduler给我的感觉就是联通性不强,需要结合流程图才能去理解。最后在补上它的流程图,以做回顾吧

54392beadb2b4dffac37c22a1d355806.png

接下来的文章讲讲它的配置。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
6月前
|
调度 Apache
airflow scheduler -D 是什么作用
【6月更文挑战第30天】airflow scheduler -D 是什么作用
109 1
|
JSON Java 调度
DolphinScheduler教程(03)- 源码分析(三)
DolphinScheduler教程(03)- 源码分析(三)
556 0
|
4月前
|
分布式计算 Hadoop 关系型数据库
dolphinscheduler搭建
先根据伪集群来部署、部署中参考非伪集群 1、mysql数据库 mysql -h主机地址 -u用户名 -p 2、查看等 less:G、上下按键、ctrl+b、ctrl+f、q find / -iname '*mysql*' 更改目录所有者 chown -R dolphinscheduler:dolphinscheduler apache-dolphinscheduler-*-bin /usr/apache-dolphinscheduler-3.1.0-bin/tools/libs 里面也需要mysql驱动
77 1
|
6月前
|
Kubernetes 监控 调度
K8S中Scheduler原理分析
【6月更文挑战第20天】K8S Scheduler是集群的关键组件,它监听API Server,为新Pod选择合适的Node。
|
SQL 监控 数据可视化
DolphinScheduler教程(02)- 系统架构设计(上)
DolphinScheduler教程(02)- 系统架构设计(上)
280 0
DolphinScheduler教程(02)- 系统架构设计(上)
|
6月前
|
存储 调度 Apache
airflow scheduler 这些命令是什么作用
【6月更文挑战第30天】airflow scheduler 这些命令是什么作用
59 0
|
7月前
|
存储 NoSQL Java
APScheduler简介
APScheduler简介
72 0
|
7月前
|
Java 调度 数据库管理
APScheduler自定义配置
APScheduler自定义配置
71 0
|
7月前
|
数据可视化 Linux 调度
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
801 0
|
缓存 分布式计算 Java
DolphinScheduler教程(03)- 源码分析(二)
DolphinScheduler教程(03)- 源码分析(二)
223 0