深度剖析Dinky源码(下)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 深度剖析Dinky源码(下)
3.2.4 Gateway (作业提交客户端)

所属模块dlink-gateway

所属类#方法com.dlink.gateway.yarn.YarnApplicationGateway#submitJar


通过断点可以得知jar作业,使用的是YarnApplicationGateway的submitJar去提交。

我们主要看看里面的实现(已补充注释):

/**
 * 提交jar作业
 *
 * @return 提交结果
 * @author : YangLinWei
 * @createTime: 2023/7/15 10:59
 */
@Override
public GatewayResult submitJar() {
    // 判断并初始化yarn客户端
    if (Asserts.isNull(yarnClient)) {
        init();
    }
    // 构造提交信息
    YarnResult result = YarnResult.build(getType());
    AppConfig appConfig = config.getAppConfig();
    configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
    String[] userJarParas = appConfig.getUserJarParas();
    if (Asserts.isNull(userJarParas)) {
        userJarParas = new String[0];
    }
    ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas,
            appConfig.getUserJarMainAppClass());
    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
            configuration, yarnConfiguration, yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
    ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder = new ClusterSpecification.ClusterSpecificationBuilder();
    if (configuration.contains(JobManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
        clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS))
                .createClusterSpecification();
    }
    if (Asserts.isNotNull(config.getJarPaths())) {
        yarnClusterDescriptor
                .addShipFiles(Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList()));
    }
    try {
        // 开始提交信息到yarn集群
        ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                clusterSpecificationBuilder.createClusterSpecification(),
                applicationConfiguration);
        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        // 封装提交后返回的信息
        Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
        int counts = SystemConfiguration.getInstances().getJobIdWait();
        while (jobStatusMessages.size() == 0 && counts > 0) {
            Thread.sleep(1000);
            counts--;
            jobStatusMessages = clusterClient.listJobs().get();
            if (jobStatusMessages.size() > 0) {
                break;
            }
        }
        if (jobStatusMessages.size() > 0) {
            List<String> jids = new ArrayList<>();
            for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
                jids.add(jobStatusMessage.getJobId().toHexString());
            }
            result.setJids(jids);
        }
        ApplicationId applicationId = clusterClient.getClusterId();
        result.setAppId(applicationId.toString());
        result.setWebURL(clusterClient.getWebInterfaceURL());
        result.success();
    } catch (Exception e) {
        result.fail(LogUtil.getError(e));
    } finally {
        yarnClusterDescriptor.close();
    }
    return result;
}

可以发现,最终还是使用flink的flink-yarn_xxx.jar包里面的YarnClusterDescriptor提交作业到yarn,最终yarn会执行指定提交的jar包。

到这里,flink的jar作业提交模式是结束了,到这里,可以结束本文的阅读了。


读者如果有兴趣的话,可以继续阅读jar提交到yarn之后的流程,我们看看flink sql模式下是怎样实现的?flink sql模式也是一样的,使用了指定的启动jar(也就是dlink-app.jar ),然后传入任务参数,执行一系列的流程。

3.4 yarn端

再次回顾看看官网是怎么描述Yarn Application下如何提交jar的:

注册集群配置 
==> 异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> TaskId & JDBC 
==> Gateway 
==> YarnApplicationGateway
==> YarnClient 
==> dlink-app.jar 
==> Executor 
==> AppStreamExecutor 
==> CustomTableEnvironmentImpl 
==> LocalEnvironmentFlink Yarn Application Cluster

似乎有点晦涩,这是我整理后对应到实际代码的流程:

前端提交接口:/api/task/submit
==> 【dlink-admin模块】:com.dlink.controller.TaskController#submit
   - 描述:后端controller提交接口
==> 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask
     - 描述:提交作业服务
==> 【dlink-core模块】:com.dlink.job.JobManager#executeJar
     - 描述:作业管理器提交jar作业
==> 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar
     - 描述:提交客户端提交
==> 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster
     - 描述:Flink yarn客户端提交源码

对比官方的流程,似乎到了==> dlink-app.jar 这一步骤就停止了,其实,dlink-app.jar就是flink作业的执行jar(entrypoint jar),需要手动上传到hdfs,具体得配置在界面:

提交作业到yarn后,创建容器时,会自动从hdfs下载 dlink-app.jar,然后启动jar,也就是==> dlink-app.jar 后面的逻辑,接下来讲讲其实现逻辑。

3.4.1 main(作业执行入口)

所属模块dlink-app

所属类#方法com.dlink.app.MainApp#main


/**
 * 作业执行入口
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:37
 */
public static void main(String[] args) throws IOException {
    Map<String, String> params = FlinkBaseUtil.getParamsFromArgs(args);
    String id = params.get(FlinkParamConstant.ID);
    Asserts.checkNullString(id, "请配置入参 id ");
    // 初始化数据库配置
    DBConfig dbConfig = DBConfig.build(params);
    // 提交
    Submiter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR));
}

从描述,可以看出,最终是使用了Submiter去提交作业。

3.4.2 submit(作业执行入口)

所属模块dlink-app-base

所属类#方法com.dlink.app.flinksql.Submiter#submit


/**
 * 提交任务
 *
 * @param id        任务ID
 * @param dbConfig  数据库连接配置
 * @param dinkyAddr 第三方jar下载,对应对象存储服务器的域名
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:55
 */
public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
    logger.info(LocalDateTime.now() + "开始提交作业 -- " + id);
    if (NULL.equals(dinkyAddr)) {
        dinkyAddr = "";
    }
    StringBuilder sb = new StringBuilder();
    // 根据任务ID获取任务配置
    Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig);
    if (Asserts.isNotNull(taskConfig.get("envId"))) {
        String envId = getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig);
        if (Asserts.isNotNullString(envId)) {
            sb.append(envId);
        }
        sb.append("\n");
    }
    // 添加数据源全局变量
    sb.append(getDbSourceSqlStatements(dbConfig, id));
    // 添加自定义全局变量信息
    sb.append(getFlinkSQLStatement(id, dbConfig));
    // 拆分SQL字符串为sql集
    List<String> statements = Submiter.getStatements(sb.toString());
    ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig);
    // 加载第三方jar
    loadDep(taskConfig.get("type"), id, dinkyAddr, executorSetting);
    String uuid = UUID.randomUUID().toString().replace("-", "");
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
    }
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
    }
    logger.info("作业配置如下: {}", executorSetting);
    // 根据配置,初始化Executor
    Executor executor = Executor.buildAppStreamExecutor(executorSetting);
    List<StatementParam> ddl = new ArrayList<>();
    List<StatementParam> trans = new ArrayList<>();
    List<StatementParam> execute = new ArrayList<>();
    // 遍历执行flink sql
    for (String item : statements) {
        String statement = FlinkInterceptor.pretreatStatement(executor, item);
        if (statement.isEmpty()) {
            continue;
        }
        SqlType operationType = Operations.getOperationType(statement);
        if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
            trans.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else if (operationType.equals(SqlType.EXECUTE)) {
            execute.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else {
            ddl.add(new StatementParam(statement, operationType));
        }
    }
    // 执行器依次执行flink sql
    for (StatementParam item : ddl) {
        logger.info("正在执行 FlinkSQL: " + item.getValue());
        executor.submitSql(item.getValue());
        logger.info("执行成功");
    }
    if (trans.size() > 0) {
        if (executorSetting.isUseStatementSet()) {
            List<String> inserts = new ArrayList<>();
            for (StatementParam item : trans) {
                if (item.getType().equals(SqlType.INSERT)) {
                    inserts.add(item.getValue());
                }
            }
            logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
            executor.submitStatementSet(inserts);
            logger.info("执行成功");
        } else {
            for (StatementParam item : trans) {
                logger.info("正在执行 FlinkSQL: " + item.getValue());
                executor.submitSql(item.getValue());
                logger.info("执行成功");
                break;
            }
        }
    }
    if (execute.size() > 0) {
        List<String> executes = new ArrayList<>();
        for (StatementParam item : execute) {
            executes.add(item.getValue());
            executor.executeSql(item.getValue());
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        }
        logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
        try {
            executor.execute(executorSetting.getJobName());
            logger.info("执行成功");
        } catch (Exception e) {
            logger.error("执行失败, {}", e.getMessage(), e);
        }
    }
    logger.info("{}任务提交成功", LocalDateTime.now());
}

从上述代码,可以得知核心的代码是初始化ExecutorAppStreamExecutor实现)之后,然后依次执行拆分后的flink sql。

3.4.2 submitSql(sql执行入口)

所属模块dlink-executor

所属类#方法com.dlink.executor.AppStreamExecutor


/**
 * Streaming执行器
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 12:02
 */
public class AppStreamExecutor extends Executor {
    /**
     * 构造函数,初始化flink默认的TableEnvironment
     *
     * @param executorSetting 执行器配置
     */
    public AppStreamExecutor(ExecutorSetting executorSetting) {
        this.executorSetting = executorSetting;
        if (Asserts.isNotNull(executorSetting.getConfig())) {
            Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        } else {
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
        }
        init();
    }
    /**
     * 公共的逻辑都在Executor ,不同的Executor区别在于这里的TableEnvironment
     *
     * @return 自定义的TableEnvironment
     */
    @Override
    CustomTableEnvironment createCustomTableEnvironment() {
        return CustomTableEnvironmentImpl.create(environment);
    }
}

执行逻辑在基类Executor执行,就是取Executor实现类(AppStreamExecutor)里面的自定义TableEnvironment执行:

继续看看CustomTableEnvironment是如何实现的?

3.4.3 executeSql(执行flink sql)

所属模块dlink-client

所属类#方法com.dlink.executor.CustomTableEnvironmentImpl


这里应该到了flink底层之上的最底层了,有兴趣的同学可以自行阅读,篇幅有限本文不再分析了,总之按flink的标准来实现就好了。

04 总结

最终,整理后的流程如下:

4.1 前端

step1:【dlink-web模块】:dinky-web/src/components/Studio/StudioMenu/index.tsx#submit

  • 描述:提交接口 ,/api/task/submit

4.2 管理端

step1: 【dlink-admin模块】:com.dlink.controller.TaskController#submit

  • 描述:后端controller提交接口

step2: 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask

  • 描述:提交作业服务

step3: 【dlink-core模块】:com.dlink.job.JobManager#executeJar

  • 描述:作业管理器提交jar作业

step4: 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar

  • 描述:提交客户端提交

step5: 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster

  • 描述:Flink yarn客户端提交源码

4.3 yarn端

step1: 【dlink-app模块】:com.dlink.app.MainApp#main

  • 描述:执行jar包入口,所有的flink sql作业都在这里开始

step2: 【dlink-app-base模块】:com.dlink.app.flinksql.Submiter#submit

  • 描述:作业提交器,Executor的初始化,并执行

step3: 【dlink-executor模块】:com.dlink.executor.AppStreamExecutor

  • 描述:作业执行器,初始化TableEnvironment,并执行

step4: 【dlink-client模块】:com.dlink.executor.CustomTableEnvironmentImpl

  • 描述:自定义TableEnvironment,实际执行flink sql的逻辑,再进一步就是flink的底层了。

05 文末

本文主要讲解了Dinky的一些概念,以及剖析了它的源码,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
API 数据处理 调度
DolphinScheduler教程(03)- 源码分析(二)
DolphinScheduler教程(03)- 源码分析(二)
751 0
|
资源调度 前端开发 Java
深度剖析Dinky源码(上)
深度剖析Dinky源码
853 0
|
资源调度 Kubernetes Java
Flink--2、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
Flink--2、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
|
SQL 存储 Apache
Paimon 实践 | 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
Paimon 实践 | 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
4876 59
|
SQL 消息中间件 Java
Flink报错问题之使用debezium-json format报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL Java Apache
Flink内存问题之内存溢出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
Oracle 关系型数据库 Java
Flink CDC产品常见问题之Flink CDC 使用jar包启动异常如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
前端开发 Java 测试技术
Dinky 打包分析(进一步了解maven)
Dinky 打包分析(进一步了解maven)
407 0
|
SQL Java 数据库
flink cdc多种数据源安装、配置与验证(超详细总结)(下)
flink cdc多种数据源安装、配置与验证(超详细总结)(下)
1111 0

热门文章

最新文章