DolphinScheduler教程(03)- 源码分析(三)

简介: DolphinScheduler教程(03)- 源码分析(三)

3.3.8 TaskPriorityQueueConsumer

根据流程图,我们可以知道TaskConsumer从队列里获取分割的任务,然后转发到Worker执行。

我们看看TaskPriorityQueueConsumer里的核心代码,可以看到里面的批量分发任务方法:

进入batchDispatch方法,可以看到从队列里获取任务分发任务:

最后分发任务给worker:

ok,到这里,我们可以看worker部分的代码了。

3.4 WorkerServer

MasterSerer在项目/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java目录。它的主程序代码与MasterServer基本一致,代码及注释如下:

@PostConstruct
public void run() {
  // worker rpc服务
    this.workerRpcServer.start();
  // 任务插件安装
    this.taskPluginManager.installPlugin();
    // 向Zookeeper注册客户端
    this.workerRegistryClient.registry();
    this.workerRegistryClient.setRegistryStoppable(this);
    Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
    this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
   // 管理Worker线程
    this.workerManagerThread.start();
    // 报告状态线程
    this.retryReportTaskStatusThread.start();
    /*
     * registry hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("WorkerServer shutdown hook");
        }
    }));
}

根据官网提供的流程图,我们需要重点留意的是TaskExecutePorcessor:

3.4.1 TaskExecutePorcessor

TaskExecuteProcessor/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java目录,毋庸置疑,其核心方法为:

@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
 // code ...
}

其中里面有一段较为核心的代码,提交任务TaskExecuteThread给管理者:

接下来看看TaskExecuteThread的代码。

3.4.2 TaskExecuteThread

TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:

@Override
public void run() {
    // dry run 预演模式
    if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
        taskExecutionContext.setStartTime(new Date());
        taskExecutionContext.setEndTime(new Date());
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
            taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        return;
    }
    try {
        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
        logger.info("script path : {}", taskExecutionContext.getExecutePath());
        if (taskExecutionContext.getStartTime() == null) {
            taskExecutionContext.setStartTime(new Date());
        }
        logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
        //回调任务执行状态
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
        taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
        // 拷贝 hdfs/minio 文件到本地
        List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
        if (!fileDownloads.isEmpty()) {
            downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
        }
        taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
        taskExecutionContext.setTaskAppId(String.format("%s_%s",
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId()));
        TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
        if (null == taskChannel) {
            throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
        }
        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);
        // 给当前线程设置名称
        Thread.currentThread().setName(taskLogName);
        task = taskChannel.createTask(taskExecutionContext);
        // 执行任务插件方法 - init
        this.task.init();
        //init varPool
        this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
        // 执行任务插件方法 -  handle
        this.task.handle();
        // 判断是否需要发送告警
        if (this.task.getNeedAlert()) {
            sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
        }
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
        taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
        logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
    } catch (Throwable e) {
        logger.error("task scheduler failure", e);
        kill();
        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
        taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
        taskExecutionContext.setProcessId(this.task.getProcessId());
        taskExecutionContext.setAppIds(this.task.getAppIds());
    } finally {
        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
        clearTaskExecPath();
        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    }
}

04 附录

4.1 核心表

① t_ds_process_definition(流程定义表):

字段 类型 注释
id int 主键
name varchar 流程定义名称
version int 流程定义版本
release_state tinyint 流程定义的发布状态:0 未上线 1已上线
project_id int 项目id
user_id int 流程定义所属用户id
process_definition_json longtext 流程定义json串
description text 流程定义描述
global_params text 全局参数
flag tinyint 流程是否可用:0 不可用,1 可用
locations text 节点坐标信息
connects text 节点连线信息
receivers text 收件人
receivers_cc text 抄送人
create_time datetime 创建时间
timeout int 超时时间
tenant_id int 租户id
update_time datetime 更新时间
modify_by varchar 修改用户
resource_ids varchar 资源id集

② t_ds_process_instance(流程实例表):

字段 类型 注释
id int 主键
name varchar 流程实例名称
process_definition_id int 流程定义id
state tinyint 流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
recovery tinyint 流程实例容错标识:0 正常,1 需要被容错重启
start_time datetime 流程实例开始时间
end_time datetime 流程实例结束时间
run_times int 流程实例运行次数
host varchar 流程实例所在的机器
command_type tinyint 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
command_param text 命令的参数(json格式)
task_depend_type tinyint 节点依赖类型:0 当前节点,1 向前执行,2 向后执行
max_try_times tinyint 最大重试次数
failure_strategy tinyint 失败策略 0 失败后结束,1 失败后继续
warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_id int 告警组id
schedule_time datetime 预期运行时间
command_start_time datetime 开始命令时间
global_params text 全局参数(固化流程定义的参数)
process_instance_json longtext 流程实例json(copy的流程定义的json)
flag tinyint 是否可用,1 可用,0不可用
update_time timestamp 更新时间
is_sub_process int 是否是子工作流 1 是,0 不是
executor_id int 命令执行用户
locations text 节点坐标信息
connects text 节点连线信息
history_cmd text 历史命令,记录所有对流程实例的操作
dependence_schedule_times text 依赖节点的预估时间
process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_group varchar 任务指定运行的worker分组
timeout int 超时时间
tenant_id int 租户id

③ t_ds_task_instance(任务实例表):

字段 类型 注释
id int 主键
name varchar 任务名称
task_type varchar 任务类型
process_definition_id int 流程定义id
process_instance_id int 流程实例id
task_json longtext 任务节点json
state tinyint 任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
submit_time datetime 任务提交时间
start_time datetime 任务开始时间
end_time datetime 任务结束时间
host varchar 执行任务的机器
execute_path varchar 任务执行路径
log_path varchar 任务日志路径
alert_flag tinyint 是否告警
retry_times int 重试次数
pid int 进程pid
app_link varchar yarn app id
flag tinyint 是否可用:0 不可用,1 可用
retry_interval int 重试间隔
max_retry_times int 最大重试次数
task_instance_priority int 任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_group varchar 任务指定运行的worker分组

④ t_ds_schedules(流程定时调度表):

字段 类型 注释
id int 主键
process_definition_id int 流程定义id
start_time datetime 调度开始时间
end_time datetime 调度结束时间
crontab varchar crontab 表达式
failure_strategy tinyint 失败策略: 0 结束,1 继续
user_id int 用户id
release_state tinyint 状态:0 未上线,1 上线
warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_id int 告警组id
process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_group varchar 任务指定运行的worker分组
create_time datetime 创建时间
update_time datetime 更新时间

⑤ t_ds_command(执行命令表):

字段 类型 注释
id int 主键
command_type tinyint 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
process_definition_id int 流程定义id
command_param text 命令的参数(json格式)
task_depend_type tinyint 节点依赖类型:0 当前节点,1 向前执行,2 向后执行
failure_strategy tinyint 失败策略:0结束,1继续
warning_type tinyint 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_id int 告警组
schedule_time datetime 预期运行时间
start_time datetime 开始时间
executor_id int 执行用户id
dependence varchar 依赖字段
update_time datetime 更新时间
process_instance_priority int 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_group varchar 任务指定运行的worker分组

05 文末

本文是个人阅读DolphinScheduler一些见解,后续或许会再更新,DolphinScheduler给我的感觉就是联通性不强,需要结合流程图才能去理解。最后在补上它的流程图,以做回顾吧

接下来的文章讲讲它的配置。

目录
相关文章
|
6月前
|
Kubernetes 监控 调度
K8S中Scheduler原理分析
【6月更文挑战第20天】K8S Scheduler是集群的关键组件,它监听API Server,为新Pod选择合适的Node。
|
SQL 监控 数据可视化
DolphinScheduler教程(02)- 系统架构设计(上)
DolphinScheduler教程(02)- 系统架构设计(上)
299 0
DolphinScheduler教程(02)- 系统架构设计(上)
|
7月前
|
存储 NoSQL Java
APScheduler简介
APScheduler简介
78 0
|
SQL 监控 数据可视化
DolphinScheduler教程(02)- 系统架构设计
DolphinScheduler教程(02)- 系统架构设计
1351 0
DolphinScheduler教程(02)- 系统架构设计
|
缓存 分布式计算 Java
DolphinScheduler教程(03)- 源码分析(二)
DolphinScheduler教程(03)- 源码分析(二)
241 0
|
SQL 分布式计算 Shell
DolphinScheduler教程(01)- 入门(下)
DolphinScheduler教程(01)- 入门(下)
565 0
|
分布式计算 应用服务中间件 持续交付
DolphinScheduler教程(01)- 入门(上)
DolphinScheduler教程(01)- 入门(上)
947 0
|
缓存 前端开发 Java
DolphinScheduler教程(04)- 项目配置分析
DolphinScheduler教程(04)- 项目配置分析
501 0
|
JSON 监控 Java
DolphinScheduler教程(02)- 系统架构设计(下)
DolphinScheduler教程(02)- 系统架构设计(下)
237 0
|
SQL 分布式计算 数据可视化
DolphinScheduler教程(01)- 入门
DolphinScheduler教程(01)- 入门
5662 3
DolphinScheduler教程(01)- 入门