3.2.4 Gateway (作业提交客户端)
所属模块:dlink-gateway
所属类#方法:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar
通过断点可以得知jar作业,使用的是YarnApplicationGateway的submitJar去提交。
我们主要看看里面的实现(已补充注释):
/** * 提交jar作业 * * @return 提交结果 * @author : YangLinWei * @createTime: 2023/7/15 10:59 */ @Override public GatewayResult submitJar() { // 判断并初始化yarn客户端 if (Asserts.isNull(yarnClient)) { init(); } // 构造提交信息 YarnResult result = YarnResult.build(getType()); AppConfig appConfig = config.getAppConfig(); configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath())); String[] userJarParas = appConfig.getUserJarParas(); if (Asserts.isNull(userJarParas)) { userJarParas = new String[0]; } ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass()); YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true); ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder = new ClusterSpecification.ClusterSpecificationBuilder(); if (configuration.contains(JobManagerOptions.TOTAL_PROCESS_MEMORY)) { clusterSpecificationBuilder .setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes()); } if (configuration.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) { clusterSpecificationBuilder .setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes()); } if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) { clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)) .createClusterSpecification(); } if (Asserts.isNotNull(config.getJarPaths())) { yarnClusterDescriptor .addShipFiles(Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList())); } try { // 开始提交信息到yarn集群 ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( clusterSpecificationBuilder.createClusterSpecification(), applicationConfiguration); ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); // 封装提交后返回的信息 Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get(); int counts = SystemConfiguration.getInstances().getJobIdWait(); while (jobStatusMessages.size() == 0 && counts > 0) { Thread.sleep(1000); counts--; jobStatusMessages = clusterClient.listJobs().get(); if (jobStatusMessages.size() > 0) { break; } } if (jobStatusMessages.size() > 0) { List<String> jids = new ArrayList<>(); for (JobStatusMessage jobStatusMessage : jobStatusMessages) { jids.add(jobStatusMessage.getJobId().toHexString()); } result.setJids(jids); } ApplicationId applicationId = clusterClient.getClusterId(); result.setAppId(applicationId.toString()); result.setWebURL(clusterClient.getWebInterfaceURL()); result.success(); } catch (Exception e) { result.fail(LogUtil.getError(e)); } finally { yarnClusterDescriptor.close(); } return result; }
可以发现,最终还是使用flink的flink-yarn_xxx.jar
包里面的YarnClusterDescriptor
提交作业到yarn
,最终yarn
会执行指定提交的jar
包。
到这里,flink的jar作业提交模式是结束了,到这里,可以结束本文的阅读了。
读者如果有兴趣的话,可以继续阅读jar
提交到yarn
之后的流程,我们看看flink sql
模式下是怎样实现的?flink sql
模式也是一样的,使用了指定的启动jar
(也就是dlink-app.jar
),然后传入任务参数,执行一系列的流程。
3.4 yarn端
再次回顾看看官网是怎么描述Yarn Application
下如何提交jar
的:
注册集群配置 ==> 异步提交 ==> StudioService ==> JobManager ==> Executor ==> TaskId & JDBC ==> Gateway ==> YarnApplicationGateway ==> YarnClient ==> dlink-app.jar ==> Executor ==> AppStreamExecutor ==> CustomTableEnvironmentImpl ==> LocalEnvironmentFlink Yarn Application Cluster
似乎有点晦涩,这是我整理后对应到实际代码的流程:
前端提交接口:/api/task/submit ==> 【dlink-admin模块】:com.dlink.controller.TaskController#submit - 描述:后端controller提交接口 ==> 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask - 描述:提交作业服务 ==> 【dlink-core模块】:com.dlink.job.JobManager#executeJar - 描述:作业管理器提交jar作业 ==> 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar - 描述:提交客户端提交 ==> 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster - 描述:Flink yarn客户端提交源码
对比官方的流程,似乎到了==> dlink-app.jar
这一步骤就停止了,其实,dlink-app.jar
就是flink
作业的执行jar(entrypoint jar)
,需要手动上传到hdfs
,具体得配置在界面:
提交作业到yarn
后,创建容器时,会自动从hdfs
下载 dlink-app.jar
,然后启动jar,也就是==> dlink-app.jar
后面的逻辑,接下来讲讲其实现逻辑。
3.4.1 main(作业执行入口)
所属模块:dlink-app
所属类#方法:com.dlink.app.MainApp#main
/** * 作业执行入口 * * @author : YangLinWei * @createTime: 2023/7/15 11:37 */ public static void main(String[] args) throws IOException { Map<String, String> params = FlinkBaseUtil.getParamsFromArgs(args); String id = params.get(FlinkParamConstant.ID); Asserts.checkNullString(id, "请配置入参 id "); // 初始化数据库配置 DBConfig dbConfig = DBConfig.build(params); // 提交 Submiter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR)); }
从描述,可以看出,最终是使用了Submiter去提交作业。
3.4.2 submit(作业执行入口)
所属模块:dlink-app-base
所属类#方法:com.dlink.app.flinksql.Submiter#submit
/** * 提交任务 * * @param id 任务ID * @param dbConfig 数据库连接配置 * @param dinkyAddr 第三方jar下载,对应对象存储服务器的域名 * @author : YangLinWei * @createTime: 2023/7/15 11:55 */ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) { logger.info(LocalDateTime.now() + "开始提交作业 -- " + id); if (NULL.equals(dinkyAddr)) { dinkyAddr = ""; } StringBuilder sb = new StringBuilder(); // 根据任务ID获取任务配置 Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig); if (Asserts.isNotNull(taskConfig.get("envId"))) { String envId = getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig); if (Asserts.isNotNullString(envId)) { sb.append(envId); } sb.append("\n"); } // 添加数据源全局变量 sb.append(getDbSourceSqlStatements(dbConfig, id)); // 添加自定义全局变量信息 sb.append(getFlinkSQLStatement(id, dbConfig)); // 拆分SQL字符串为sql集 List<String> statements = Submiter.getStatements(sb.toString()); ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig); // 加载第三方jar loadDep(taskConfig.get("type"), id, dinkyAddr, executorSetting); String uuid = UUID.randomUUID().toString().replace("-", ""); if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) { executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid); } if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) { executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid); } logger.info("作业配置如下: {}", executorSetting); // 根据配置,初始化Executor Executor executor = Executor.buildAppStreamExecutor(executorSetting); List<StatementParam> ddl = new ArrayList<>(); List<StatementParam> trans = new ArrayList<>(); List<StatementParam> execute = new ArrayList<>(); // 遍历执行flink sql for (String item : statements) { String statement = FlinkInterceptor.pretreatStatement(executor, item); if (statement.isEmpty()) { continue; } SqlType operationType = Operations.getOperationType(statement); if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) { trans.add(new StatementParam(statement, operationType)); if (!executorSetting.isUseStatementSet()) { break; } } else if (operationType.equals(SqlType.EXECUTE)) { execute.add(new StatementParam(statement, operationType)); if (!executorSetting.isUseStatementSet()) { break; } } else { ddl.add(new StatementParam(statement, operationType)); } } // 执行器依次执行flink sql for (StatementParam item : ddl) { logger.info("正在执行 FlinkSQL: " + item.getValue()); executor.submitSql(item.getValue()); logger.info("执行成功"); } if (trans.size() > 0) { if (executorSetting.isUseStatementSet()) { List<String> inserts = new ArrayList<>(); for (StatementParam item : trans) { if (item.getType().equals(SqlType.INSERT)) { inserts.add(item.getValue()); } } logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts)); executor.submitStatementSet(inserts); logger.info("执行成功"); } else { for (StatementParam item : trans) { logger.info("正在执行 FlinkSQL: " + item.getValue()); executor.submitSql(item.getValue()); logger.info("执行成功"); break; } } } if (execute.size() > 0) { List<String> executes = new ArrayList<>(); for (StatementParam item : execute) { executes.add(item.getValue()); executor.executeSql(item.getValue()); if (!executorSetting.isUseStatementSet()) { break; } } logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes)); try { executor.execute(executorSetting.getJobName()); logger.info("执行成功"); } catch (Exception e) { logger.error("执行失败, {}", e.getMessage(), e); } } logger.info("{}任务提交成功", LocalDateTime.now()); }
从上述代码,可以得知核心的代码是初始化Executor
(AppStreamExecutor
实现)之后,然后依次执行拆分后的flink sql。
3.4.2 submitSql(sql执行入口)
所属模块:dlink-executor
所属类#方法:com.dlink.executor.AppStreamExecutor
/** * Streaming执行器 * * @author : YangLinWei * @createTime: 2023/7/15 12:02 */ public class AppStreamExecutor extends Executor { /** * 构造函数,初始化flink默认的TableEnvironment * * @param executorSetting 执行器配置 */ public AppStreamExecutor(ExecutorSetting executorSetting) { this.executorSetting = executorSetting; if (Asserts.isNotNull(executorSetting.getConfig())) { Configuration configuration = Configuration.fromMap(executorSetting.getConfig()); this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); } else { this.environment = StreamExecutionEnvironment.getExecutionEnvironment(); } init(); } /** * 公共的逻辑都在Executor ,不同的Executor区别在于这里的TableEnvironment * * @return 自定义的TableEnvironment */ @Override CustomTableEnvironment createCustomTableEnvironment() { return CustomTableEnvironmentImpl.create(environment); } }
执行逻辑在基类Executor执行,就是取Executor实现类(AppStreamExecutor)里面的自定义TableEnvironment执行:
继续看看CustomTableEnvironment
是如何实现的?
3.4.3 executeSql(执行flink sql)
所属模块:dlink-client
所属类#方法:com.dlink.executor.CustomTableEnvironmentImpl
这里应该到了flink底层之上的最底层了,有兴趣的同学可以自行阅读,篇幅有限本文不再分析了,总之按flink的标准来实现就好了。
04 总结
最终,整理后的流程如下:
4.1 前端
step1:【dlink-web模块】:dinky-web/src/components/Studio/StudioMenu/index.tsx#submit
- 描述:提交接口 ,/api/task/submit
4.2 管理端
step1: 【dlink-admin模块】:com.dlink.controller.TaskController#submit
- 描述:后端controller提交接口
step2: 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask
- 描述:提交作业服务
step3: 【dlink-core模块】:com.dlink.job.JobManager#executeJar
- 描述:作业管理器提交jar作业
step4: 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar
- 描述:提交客户端提交
step5: 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster
- 描述:Flink yarn客户端提交源码
4.3 yarn端
step1: 【dlink-app模块】:com.dlink.app.MainApp#main
- 描述:执行jar包入口,所有的flink sql作业都在这里开始
step2: 【dlink-app-base模块】:com.dlink.app.flinksql.Submiter#submit
- 描述:作业提交器,Executor的初始化,并执行
step3: 【dlink-executor模块】:com.dlink.executor.AppStreamExecutor
- 描述:作业执行器,初始化TableEnvironment,并执行
step4: 【dlink-client模块】:com.dlink.executor.CustomTableEnvironmentImpl
- 描述:自定义TableEnvironment,实际执行flink sql的逻辑,再进一步就是flink的底层了。
05 文末
本文主要讲解了Dinky的一些概念,以及剖析了它的源码,希望能帮助到大家,谢谢大家的阅读,本文完!