01 引言
最近刚好看到一款 Flink 二次开发的开源框架 Dinky,简单看了其官网的描述,似乎功能很全,经过拆分项目,最终本地跑起来了,运行如下:
前端 | 后端 |
浏览器登录之后如下:
使用了一阵,发现了有不少的bug
,点击几下就报错了,但是问题不大,本着学习的态度,主要学习其核心的功能点,本文以Dinky任务提交的流程分析一下其源码。
02 目录结构
参考的文档:http://www.dlink.top/docs/0.7/developer_guide/local_debug
经过整理之后,Dinky后端的目录结构如下:
├ dlink 父项目 ├── dlink-admin 【管理中心】:标准的 SpringBoot 应用,负责与前端 react 交互 ├── dlink-alert 【告警中心】:集成钉钉 、企业微信 、飞书 、邮箱 │ ├── dlink-alert-base │ ├── dlink-alert-dingtalk │ ├── dlink-alert-email │ ├── dlink-alert-feishu ├── dlink-app 【flink entrypoint jar】:Dlink 在 Yarn Application 模式所使用的简化解析包 │ ├── dlink-app-1.11 │ ├── dlink-app-1.12 │ ├── dlink-app-1.13 │ ├── dlink-app-1.14 │ ├── dlink-app-1.15 │ ├── dlink-app-1.16 │ ├── dlink-app-1.17 │ └── dlink-app-base ├── dlink-assembly 【打包配置】:最终 tar.gz 的打包内容 ├── dlink-catalog │ └── dlink-catalog-mysql │ ├── dlink-catalog-mysql-1.13 │ ├── dlink-catalog-mysql-1.14 │ ├── dlink-catalog-mysql-1.15 │ ├── dlink-catalog-mysql-1.16 │ └── dlink-catalog-mysql-1.17 ├── dlink-client 【Client中心】用来桥接 Dlink 与不同版本的 Flink 运行环境 │ ├── dlink-client-1.11 │ ├── dlink-client-1.12 │ ├── dlink-client-1.13 │ ├── dlink-client-1.14 │ ├── dlink-client-1.15 │ ├── dlink-client-1.16 │ ├── dlink-client-1.17 │ ├── dlink-client-base │ └── dlink-client-hadoop ├── dlink-common 【公用模块】 ├── dlink-connectors 【自定义connector】 │ ├── dlink-connector-doris-1.13 │ ├── dlink-connector-jdbc-1.11 │ ├── dlink-connector-jdbc-1.12 │ ├── dlink-connector-jdbc-1.13 │ ├── dlink-connector-jdbc-1.14 │ ├── dlink-connector-phoenix-1.13 │ ├── dlink-connector-phoenix-1.14 │ └── dlink-connector-pulsar-1.14 ├── dlink-core 【执行中心】 ├── dlink-daemon 【任务监听线程】 ├── dlink-doc 【打包资源】:包含启动脚本、sql脚本、配置文件等 ├── dlink-executor 【执行模块】:从 dlink-core 中拆分出来,内含最核心的 Executor、Interceptor、Operation 等实现 ├── dlink-flink 【flink运行时依赖的环境】 │ ├── dlink-flink-1.11 │ ├── dlink-flink-1.12 │ ├── dlink-flink-1.13 │ ├── dlink-flink-1.14 │ ├── dlink-flink-1.15 │ ├── dlink-flink-1.16 │ └── dlink-flink-1.17 ├── dlink-function 【自定义函数】 ├── dlink-gateway【任务网关】:负责把实现不同执行模式的任务提交与管理,目前主要包含 Yarn PerJob 和 Application ├── dlink-metadata 【元数据中心】:用于实现各种外部数据源对接到 Dlink,以此使用其各种查询、执行等能力,未来用于 Flink Catalog 的预装载等。 │ ├── dlink-metadata-base │ ├── dlink-metadata-clickhouse │ ├── dlink-metadata-doris │ ├── dlink-metadata-hive │ ├── dlink-metadata-mysql │ ├── dlink-metadata-oracle │ ├── dlink-metadata-phoenix │ ├── dlink-metadata-postgresql │ ├── dlink-metadata-presto │ ├── dlink-metadata-sqlserver │ └── dlink-metadata-starrocks ├── dlink-process 【流程管理?】 ├── dlink-scheduler 【应该是对接DolphinScheduler的】 ├── docker 【相关dockerfile脚本】 │ ├── mysql │ ├── server │ └── web ├── docs 【相关文档】 └── dlink-web 【dlink前端,React框架】
在了解完目录结构后,接下来可以进入本文的核心部分,一起去分析Dinky
的作业提交流程。
03 源码分析
在进行源码分析前,有必要看看官网贴出的,在不同执行模式下,任务提交的流程。这里复制官网所说的任务执行路线:
Local模式:
同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> LocalStreamExecutor ==> CustomTableEnvironmentImpl ==> LocalEnvironment
Standalone模式:
注册集群实例 ==> 同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> RemoteStreamExecutor ==> CustomTableEnvironmentImpl ==> RemoteEnvironment ==> JobGraph ==> Flink Standalone Cluster
Yarn Session模式:
注册集群实例 ==> 同步执行/异步提交 ==> StudioService ==> JobManager ==> Executor ==> RemoteStreamExecutor ==> CustomTableEnvironmentImpl ==> RemoteEnvironment ==> JobGraph ==> Flink Yarn Session Cluster
Yarn Per-Job模式:
注册集群配置 ==> 异步提交 ==> StudioService ==> JobManager ==> Executor ==> JobGraph ==> Gateway ==> YarnPerJobGateway ==> YarnClient ==> Flink Yarn Per-Job Cluster
Yarn Application模式:
注册集群配置 ==> 异步提交 ==> StudioService ==> JobManager ==> Executor ==> TaskId & JDBC ==> Gateway ==> YarnApplicationGateway ==> YarnClient ==> dlink-app.jar ==> Executor ==> AppStreamExecutor ==> CustomTableEnvironmentImpl ==> LocalEnvironmentFlink Yarn Application Cluster
Yarn Application 模式是我们日常开发中用的最多的,我们以此为例子,看看整个流程。
3.1 前端
前端的操作流程大致如下,图片里已有描述:
我们最终需要知道的是提交的内容,浏览器右键的cURL内容如下:
curl 'http://localhost:8000/api/task/submit' \ -H 'Accept: application/json' \ -H 'Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' \ -H 'Connection: keep-alive' \ -H 'Content-Type: application/json;charset=UTF-8' \ -H 'Cookie: tenantId=1; satoken=8733fc5d-c12b-497a-9e56-ad1991e2eef3' \ -H 'Origin: http://localhost:8000' \ -H 'Referer: http://localhost:8000/' \ -H 'Sec-Fetch-Dest: empty' \ -H 'Sec-Fetch-Mode: cors' \ -H 'Sec-Fetch-Site: same-origin' \ -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' \ -H 'sec-ch-ua: "Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"' \ -H 'sec-ch-ua-mobile: ?0' \ -H 'sec-ch-ua-platform: "macOS"' \ -H 'tenantId: 1' \ --data-raw '{"0":2}' \ --compressed
3.2 管理端
3.2.1 submitTask (提交的接口)
所属模块:dlink-admin
所属类#方法:com.dlink.controller.TaskController#submit
代码如下(含注释):
/** * 任务提交接口 * * @param para 提交参数(任务ID数组) * @return 提交结果 * @author : YangLinWei * @createTime: 2023/7/14 16:24 */ @PostMapping(value = "/submit") public Result submit(@RequestBody JsonNode para) throws Exception { if (para.size() > 0) { List<JobResult> results = new ArrayList<>(); List<Integer> error = new ArrayList<>(); for (final JsonNode item : para) { Integer id = item.asInt(); // 提交任务 JobResult result = taskService.submitTask(id); if (!result.isSuccess()) { error.add(id); } results.add(result); } if (error.size() == 0) { return Result.succeed(results, "执行成功"); } else { return Result.succeed(results, "执行部分成功,但" + error.toString() + "执行失败,共" + error.size() + "次失败。"); } } else { return Result.failed("请选择要执行的记录"); } }
3.2.2 submitTask(提交的主体流程)
所属模块:dlink-admin
所属类#方法:com.dlink.service.impl.TaskServiceImpl#submitTask
可以看到里面使用taskService来提交任务,继续进入com.dlink.service.impl.TaskServiceImpl#submitTask
,可以看到代码如下(已写注释):
/** * 提交作业任务 * * @param id 任务ID * @return 任务结果 * @author : YangLinWei * @createTime: 2023/7/14 16:24 */ @Override public JobResult submitTask(Integer id) { // 1. 获取任务信息 Task task = this.getTaskInfoById(id); Asserts.checkNull(task, Tips.TASK_NOT_EXIST); // 2. 判断是否为flink任务 if (Dialect.notFlinkSql(task.getDialect())) { // 2.1 如果不是flink任务,执行普通的sql(本文不分析) return executeCommonSql(SqlDTO.build(task.getStatement(), task.getDatabaseId(), null)); } // 3. 初始化进程实例(主要用作控制台显示日志) ProcessEntity process = null; if (StpUtil.isLogin()) { // 3.1 如果用户已经登录,设置process进入ProcessContextHolder上下文,下次用户登录进来就可以继续看日志了 process = ProcessContextHolder.registerProcess( ProcessEntity.init(ProcessType.FLINKSUBMIT, StpUtil.getLoginIdAsInt())); } else { // 3.2 如果没登录,则则直接打印 process = ProcessEntity.NULL_PROCESS; } // 4. 根据任务,配置flink 作业配置 process.info("Initializing Flink job config..."); JobConfig config = buildJobConfig(task); // 5. 判断执行模式是否为 kubernetes-application if (GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) { // 5.1 构建并加载Docker镜像 loadDocker(id, config.getClusterConfigurationId(), config.getGatewayConfig()); } // 6. 构建JobManager JobManager jobManager = JobManager.build(config); // 7. 执行流程记录(start) process.start(); if (!config.isJarTask()) { // 如果不是jar任务,JobManager 提交执行flink sql JobResult jobResult = jobManager.executeSql(task.getStatement()); process.finish("Submit Flink SQL finished."); return jobResult; } else { // 如果是jar任务,JobManager 提交执行执行jar JobResult jobResult = jobManager.executeJar(); process.finish("Submit Flink Jar finished."); return jobResult; } }
上述的代码不多,但是有很多逻辑,我们主要分析flink jar 的流程,其实可以精简的理解为:
==> getTaskInfoById(taskId):查询任务的配置,以及关联的作业实例 ==> buildJobConfig(Task task):根据任务信息构造flink作业配置(例如:执行jar的路径、savepoint配置等,也就是提交界面右侧的作业配置) ==> JobManager.build(config):根据作业配置,初始化JobManager,主要是初始化了提交作业的GateWay配置、初始化JobHandler(Job2MysqlHandler)、构造Executor(LocalStreamExecutor) ==> jobManager.executeJar():执行jar作业
从上述的流程描述,可以看到执行jar作业的核心代码在:com.dlink.job.JobManager#executeJar
。
3.2.3 executeJar (执行jar作业)
所属模块:dlink-core
所属类#方法:com.dlink.job.JobManager#executeJar
执行作业的代码流程如下:
/** * 执行jar作业 * * @return 作业提交结果 * @author : YangLinWei * @createTime: 2023/7/14 16:24 */ public JobResult executeJar() { // 从上下文获取流程实例(主要用作记录打印) ProcessEntity process = ProcessContextHolder.getProcess(); // 初始化作业配置 Job job = Job.init(runMode, config, executorSetting, executor, null, useGateway); // 把当前作业设置进上下文 JobContextHolder.setJob(job); // 准备提交的工作(主要是执行JobHandler的init方法) ready(); try { // 构造GateWay(YarnApplicationGateWay),并提交Jar GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(); // 下面的逻辑便是设置作业提交后的结果,并返回给前端 job.setResult(InsertResult.success(gatewayResult.getAppId())); job.setJobId(gatewayResult.getAppId()); job.setJids(gatewayResult.getJids()); job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL())); job.setEndTime(LocalDateTime.now()); if (gatewayResult.isSucess()) { job.setStatus(Job.JobStatus.SUCCESS); success(); } else { job.setError(gatewayResult.getError()); job.setStatus(Job.JobStatus.FAILED); failed(); } } catch (Exception e) { String error = LogUtil.getError( "Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e); job.setEndTime(LocalDateTime.now()); job.setStatus(Job.JobStatus.FAILED); job.setError(error); failed(); process.error(error); } finally { close(); } return job.getJobResult(); }
从上述的代码可以看到,用到了两个核心的类,分别是JobHandler
和GateWay
。
3.3.3 JobHandler (作业处理器)
所属模块:dlink-core
所属类#方法:com.dlink.job.JobHandler#ready()
从上述代码,可以知道上面的JobHandler
(Job2MysqlHandler
实现)记录了执行的流程,依次如下:
ready() => success()/failed()
其中接口如下:
/** * 作业生命周期处理器 * * @author : YangLinWei * @createTime: 2023/7/14 23:31 * @version: 1.0.0 */ public interface JobHandler { /** * 初始化(记录作业历史等) */ boolean init(); /** * 准备提交前出发的操作