3.3.8 TaskPriorityQueueConsumer
根据流程图,我们可以知道TaskConsumer
从队列里获取分割的任务,然后转发到Worker执行。
我们看看TaskPriorityQueueConsumer
里的核心代码,可以看到里面的批量分发任务方法:
进入batchDispatch方法,可以看到从队列里获取任务分发任务:
最后分发任务给worker:
ok,到这里,我们可以看worker部分的代码了。
3.4 WorkerServer
MasterSerer在项目/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
目录。它的主程序代码与MasterServer基本一致,代码及注释如下:
@PostConstruct public void run() { // worker rpc服务 this.workerRpcServer.start(); // 任务插件安装 this.taskPluginManager.installPlugin(); // 向Zookeeper注册客户端 this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); // 管理Worker线程 this.workerManagerThread.start(); // 报告状态线程 this.retryReportTaskStatusThread.start(); /* * registry hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { close("WorkerServer shutdown hook"); } })); }
根据官网提供的流程图,我们需要重点留意的是TaskExecutePorcessor:
3.4.1 TaskExecutePorcessor
TaskExecuteProcessor在 /dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
目录,毋庸置疑,其核心方法为:
@Counted(value = "ds.task.execution.count", description = "task execute total count") @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override public void process(Channel channel, Command command) { // code ... }
其中里面有一段较为核心的代码,提交任务TaskExecuteThread
给管理者:
接下来看看TaskExecuteThread的代码。
3.4.2 TaskExecuteThread
TaskExecuteThread就是最终执行任务的代码了,里面的run方法如下,已加好注释:
@Override public void run() { // dry run 预演模式 if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); return; } try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); logger.info("script path : {}", taskExecutionContext.getExecutePath()); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); //回调任务执行状态 taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext); // 拷贝 hdfs/minio 文件到本地 List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); if (!fileDownloads.isEmpty()) { downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); } taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskLogName(taskLogName); // 给当前线程设置名称 Thread.currentThread().setName(taskLogName); task = taskChannel.createTask(taskExecutionContext); // 执行任务插件方法 - init this.task.init(); //init varPool this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); // 执行任务插件方法 - handle this.task.handle(); // 判断是否需要发送告警 if (this.task.getNeedAlert()) { sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode()); } taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode())); taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { logger.error("task scheduler failure", e); kill(); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); taskExecutionContext.setProcessId(this.task.getProcessId()); taskExecutionContext.setAppIds(this.task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); clearTaskExecPath(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } }
04 附录
4.1 核心表
① t_ds_process_definition(流程定义表):
字段 | 类型 | 注释 |
id | int | 主键 |
name | varchar | 流程定义名称 |
version | int | 流程定义版本 |
release_state | tinyint | 流程定义的发布状态:0 未上线 1已上线 |
project_id | int | 项目id |
user_id | int | 流程定义所属用户id |
process_definition_json | longtext | 流程定义json串 |
description | text | 流程定义描述 |
global_params | text | 全局参数 |
flag | tinyint | 流程是否可用:0 不可用,1 可用 |
locations | text | 节点坐标信息 |
connects | text | 节点连线信息 |
receivers | text | 收件人 |
receivers_cc | text | 抄送人 |
create_time | datetime | 创建时间 |
timeout | int | 超时时间 |
tenant_id | int | 租户id |
update_time | datetime | 更新时间 |
modify_by | varchar | 修改用户 |
resource_ids | varchar | 资源id集 |
② t_ds_process_instance(流程实例表):
字段 | 类型 | 注释 |
id | int | 主键 |
name | varchar | 流程实例名称 |
process_definition_id | int | 流程定义id |
state | tinyint | 流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
recovery | tinyint | 流程实例容错标识:0 正常,1 需要被容错重启 |
start_time | datetime | 流程实例开始时间 |
end_time | datetime | 流程实例结束时间 |
run_times | int | 流程实例运行次数 |
host | varchar | 流程实例所在的机器 |
command_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
command_param | text | 命令的参数(json格式) |
task_depend_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
max_try_times | tinyint | 最大重试次数 |
failure_strategy | tinyint | 失败策略 0 失败后结束,1 失败后继续 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组id |
schedule_time | datetime | 预期运行时间 |
command_start_time | datetime | 开始命令时间 |
global_params | text | 全局参数(固化流程定义的参数) |
process_instance_json | longtext | 流程实例json(copy的流程定义的json) |
flag | tinyint | 是否可用,1 可用,0不可用 |
update_time | timestamp | 更新时间 |
is_sub_process | int | 是否是子工作流 1 是,0 不是 |
executor_id | int | 命令执行用户 |
locations | text | 节点坐标信息 |
connects | text | 节点连线信息 |
history_cmd | text | 历史命令,记录所有对流程实例的操作 |
dependence_schedule_times | text | 依赖节点的预估时间 |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
timeout | int | 超时时间 |
tenant_id | int | 租户id |
③ t_ds_task_instance(任务实例表):
字段 | 类型 | 注释 |
id | int | 主键 |
name | varchar | 任务名称 |
task_type | varchar | 任务类型 |
process_definition_id | int | 流程定义id |
process_instance_id | int | 流程实例id |
task_json | longtext | 任务节点json |
state | tinyint | 任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
submit_time | datetime | 任务提交时间 |
start_time | datetime | 任务开始时间 |
end_time | datetime | 任务结束时间 |
host | varchar | 执行任务的机器 |
execute_path | varchar | 任务执行路径 |
log_path | varchar | 任务日志路径 |
alert_flag | tinyint | 是否告警 |
retry_times | int | 重试次数 |
pid | int | 进程pid |
app_link | varchar | yarn app id |
flag | tinyint | 是否可用:0 不可用,1 可用 |
retry_interval | int | 重试间隔 |
max_retry_times | int | 最大重试次数 |
task_instance_priority | int | 任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
④ t_ds_schedules(流程定时调度表):
字段 | 类型 | 注释 |
id | int | 主键 |
process_definition_id | int | 流程定义id |
start_time | datetime | 调度开始时间 |
end_time | datetime | 调度结束时间 |
crontab | varchar | crontab 表达式 |
failure_strategy | tinyint | 失败策略: 0 结束,1 继续 |
user_id | int | 用户id |
release_state | tinyint | 状态:0 未上线,1 上线 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组id |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
create_time | datetime | 创建时间 |
update_time | datetime | 更新时间 |
⑤ t_ds_command(执行命令表):
字段 | 类型 | 注释 |
id | int | 主键 |
command_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
process_definition_id | int | 流程定义id |
command_param | text | 命令的参数(json格式) |
task_depend_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
failure_strategy | tinyint | 失败策略:0结束,1继续 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组 |
schedule_time | datetime | 预期运行时间 |
start_time | datetime | 开始时间 |
executor_id | int | 执行用户id |
dependence | varchar | 依赖字段 |
update_time | datetime | 更新时间 |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
05 文末
本文是个人阅读DolphinScheduler
一些见解,后续或许会再更新,DolphinScheduler
给我的感觉就是联通性不强,需要结合流程图才能去理解。最后在补上它的流程图,以做回顾吧
接下来的文章讲讲它的配置。