深度剖析Dinky源码(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 深度剖析Dinky源码(下)
3.2.4 Gateway (作业提交客户端)

所属模块dlink-gateway

所属类#方法com.dlink.gateway.yarn.YarnApplicationGateway#submitJar


通过断点可以得知jar作业,使用的是YarnApplicationGateway的submitJar去提交。

我们主要看看里面的实现(已补充注释):

/**
 * 提交jar作业
 *
 * @return 提交结果
 * @author : YangLinWei
 * @createTime: 2023/7/15 10:59
 */
@Override
public GatewayResult submitJar() {
    // 判断并初始化yarn客户端
    if (Asserts.isNull(yarnClient)) {
        init();
    }
    // 构造提交信息
    YarnResult result = YarnResult.build(getType());
    AppConfig appConfig = config.getAppConfig();
    configuration.set(PipelineOptions.JARS, Collections.singletonList(appConfig.getUserJarPath()));
    String[] userJarParas = appConfig.getUserJarParas();
    if (Asserts.isNull(userJarParas)) {
        userJarParas = new String[0];
    }
    ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(userJarParas,
            appConfig.getUserJarMainAppClass());
    YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
            configuration, yarnConfiguration, yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
    ClusterSpecification.ClusterSpecificationBuilder clusterSpecificationBuilder = new ClusterSpecification.ClusterSpecificationBuilder();
    if (configuration.contains(JobManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setMasterMemoryMB(configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
        clusterSpecificationBuilder
                .setTaskManagerMemoryMB(configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes());
    }
    if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
        clusterSpecificationBuilder.setSlotsPerTaskManager(configuration.get(TaskManagerOptions.NUM_TASK_SLOTS))
                .createClusterSpecification();
    }
    if (Asserts.isNotNull(config.getJarPaths())) {
        yarnClusterDescriptor
                .addShipFiles(Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList()));
    }
    try {
        // 开始提交信息到yarn集群
        ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                clusterSpecificationBuilder.createClusterSpecification(),
                applicationConfiguration);
        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        // 封装提交后返回的信息
        Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();
        int counts = SystemConfiguration.getInstances().getJobIdWait();
        while (jobStatusMessages.size() == 0 && counts > 0) {
            Thread.sleep(1000);
            counts--;
            jobStatusMessages = clusterClient.listJobs().get();
            if (jobStatusMessages.size() > 0) {
                break;
            }
        }
        if (jobStatusMessages.size() > 0) {
            List<String> jids = new ArrayList<>();
            for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
                jids.add(jobStatusMessage.getJobId().toHexString());
            }
            result.setJids(jids);
        }
        ApplicationId applicationId = clusterClient.getClusterId();
        result.setAppId(applicationId.toString());
        result.setWebURL(clusterClient.getWebInterfaceURL());
        result.success();
    } catch (Exception e) {
        result.fail(LogUtil.getError(e));
    } finally {
        yarnClusterDescriptor.close();
    }
    return result;
}

可以发现,最终还是使用flink的flink-yarn_xxx.jar包里面的YarnClusterDescriptor提交作业到yarn,最终yarn会执行指定提交的jar包。

到这里,flink的jar作业提交模式是结束了,到这里,可以结束本文的阅读了。


读者如果有兴趣的话,可以继续阅读jar提交到yarn之后的流程,我们看看flink sql模式下是怎样实现的?flink sql模式也是一样的,使用了指定的启动jar(也就是dlink-app.jar ),然后传入任务参数,执行一系列的流程。

3.4 yarn端

再次回顾看看官网是怎么描述Yarn Application下如何提交jar的:

注册集群配置 
==> 异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> TaskId & JDBC 
==> Gateway 
==> YarnApplicationGateway
==> YarnClient 
==> dlink-app.jar 
==> Executor 
==> AppStreamExecutor 
==> CustomTableEnvironmentImpl 
==> LocalEnvironmentFlink Yarn Application Cluster

似乎有点晦涩,这是我整理后对应到实际代码的流程:

前端提交接口:/api/task/submit
==> 【dlink-admin模块】:com.dlink.controller.TaskController#submit
   - 描述:后端controller提交接口
==> 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask
     - 描述:提交作业服务
==> 【dlink-core模块】:com.dlink.job.JobManager#executeJar
     - 描述:作业管理器提交jar作业
==> 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar
     - 描述:提交客户端提交
==> 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster
     - 描述:Flink yarn客户端提交源码

对比官方的流程,似乎到了==> dlink-app.jar 这一步骤就停止了,其实,dlink-app.jar就是flink作业的执行jar(entrypoint jar),需要手动上传到hdfs,具体得配置在界面:

提交作业到yarn后,创建容器时,会自动从hdfs下载 dlink-app.jar,然后启动jar,也就是==> dlink-app.jar 后面的逻辑,接下来讲讲其实现逻辑。

3.4.1 main(作业执行入口)

所属模块dlink-app

所属类#方法com.dlink.app.MainApp#main


/**
 * 作业执行入口
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:37
 */
public static void main(String[] args) throws IOException {
    Map<String, String> params = FlinkBaseUtil.getParamsFromArgs(args);
    String id = params.get(FlinkParamConstant.ID);
    Asserts.checkNullString(id, "请配置入参 id ");
    // 初始化数据库配置
    DBConfig dbConfig = DBConfig.build(params);
    // 提交
    Submiter.submit(Integer.valueOf(id), dbConfig, params.get(FlinkParamConstant.DINKY_ADDR));
}

从描述,可以看出,最终是使用了Submiter去提交作业。

3.4.2 submit(作业执行入口)

所属模块dlink-app-base

所属类#方法com.dlink.app.flinksql.Submiter#submit


/**
 * 提交任务
 *
 * @param id        任务ID
 * @param dbConfig  数据库连接配置
 * @param dinkyAddr 第三方jar下载,对应对象存储服务器的域名
 * @author : YangLinWei
 * @createTime: 2023/7/15 11:55
 */
public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
    logger.info(LocalDateTime.now() + "开始提交作业 -- " + id);
    if (NULL.equals(dinkyAddr)) {
        dinkyAddr = "";
    }
    StringBuilder sb = new StringBuilder();
    // 根据任务ID获取任务配置
    Map<String, String> taskConfig = Submiter.getTaskConfig(id, dbConfig);
    if (Asserts.isNotNull(taskConfig.get("envId"))) {
        String envId = getFlinkSQLStatement(Integer.valueOf(taskConfig.get("envId")), dbConfig);
        if (Asserts.isNotNullString(envId)) {
            sb.append(envId);
        }
        sb.append("\n");
    }
    // 添加数据源全局变量
    sb.append(getDbSourceSqlStatements(dbConfig, id));
    // 添加自定义全局变量信息
    sb.append(getFlinkSQLStatement(id, dbConfig));
    // 拆分SQL字符串为sql集
    List<String> statements = Submiter.getStatements(sb.toString());
    ExecutorSetting executorSetting = ExecutorSetting.build(taskConfig);
    // 加载第三方jar
    loadDep(taskConfig.get("type"), id, dinkyAddr, executorSetting);
    String uuid = UUID.randomUUID().toString().replace("-", "");
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()) + "/" + uuid);
    }
    if (executorSetting.getConfig().containsKey(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) {
        executorSetting.getConfig().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
                executorSetting.getConfig().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()) + "/" + uuid);
    }
    logger.info("作业配置如下: {}", executorSetting);
    // 根据配置,初始化Executor
    Executor executor = Executor.buildAppStreamExecutor(executorSetting);
    List<StatementParam> ddl = new ArrayList<>();
    List<StatementParam> trans = new ArrayList<>();
    List<StatementParam> execute = new ArrayList<>();
    // 遍历执行flink sql
    for (String item : statements) {
        String statement = FlinkInterceptor.pretreatStatement(executor, item);
        if (statement.isEmpty()) {
            continue;
        }
        SqlType operationType = Operations.getOperationType(statement);
        if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
            trans.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else if (operationType.equals(SqlType.EXECUTE)) {
            execute.add(new StatementParam(statement, operationType));
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        } else {
            ddl.add(new StatementParam(statement, operationType));
        }
    }
    // 执行器依次执行flink sql
    for (StatementParam item : ddl) {
        logger.info("正在执行 FlinkSQL: " + item.getValue());
        executor.submitSql(item.getValue());
        logger.info("执行成功");
    }
    if (trans.size() > 0) {
        if (executorSetting.isUseStatementSet()) {
            List<String> inserts = new ArrayList<>();
            for (StatementParam item : trans) {
                if (item.getType().equals(SqlType.INSERT)) {
                    inserts.add(item.getValue());
                }
            }
            logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
            executor.submitStatementSet(inserts);
            logger.info("执行成功");
        } else {
            for (StatementParam item : trans) {
                logger.info("正在执行 FlinkSQL: " + item.getValue());
                executor.submitSql(item.getValue());
                logger.info("执行成功");
                break;
            }
        }
    }
    if (execute.size() > 0) {
        List<String> executes = new ArrayList<>();
        for (StatementParam item : execute) {
            executes.add(item.getValue());
            executor.executeSql(item.getValue());
            if (!executorSetting.isUseStatementSet()) {
                break;
            }
        }
        logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));
        try {
            executor.execute(executorSetting.getJobName());
            logger.info("执行成功");
        } catch (Exception e) {
            logger.error("执行失败, {}", e.getMessage(), e);
        }
    }
    logger.info("{}任务提交成功", LocalDateTime.now());
}

从上述代码,可以得知核心的代码是初始化ExecutorAppStreamExecutor实现)之后,然后依次执行拆分后的flink sql。

3.4.2 submitSql(sql执行入口)

所属模块dlink-executor

所属类#方法com.dlink.executor.AppStreamExecutor


/**
 * Streaming执行器
 *
 * @author : YangLinWei
 * @createTime: 2023/7/15 12:02
 */
public class AppStreamExecutor extends Executor {
    /**
     * 构造函数,初始化flink默认的TableEnvironment
     *
     * @param executorSetting 执行器配置
     */
    public AppStreamExecutor(ExecutorSetting executorSetting) {
        this.executorSetting = executorSetting;
        if (Asserts.isNotNull(executorSetting.getConfig())) {
            Configuration configuration = Configuration.fromMap(executorSetting.getConfig());
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        } else {
            this.environment = StreamExecutionEnvironment.getExecutionEnvironment();
        }
        init();
    }
    /**
     * 公共的逻辑都在Executor ,不同的Executor区别在于这里的TableEnvironment
     *
     * @return 自定义的TableEnvironment
     */
    @Override
    CustomTableEnvironment createCustomTableEnvironment() {
        return CustomTableEnvironmentImpl.create(environment);
    }
}

执行逻辑在基类Executor执行,就是取Executor实现类(AppStreamExecutor)里面的自定义TableEnvironment执行:

继续看看CustomTableEnvironment是如何实现的?

3.4.3 executeSql(执行flink sql)

所属模块dlink-client

所属类#方法com.dlink.executor.CustomTableEnvironmentImpl


这里应该到了flink底层之上的最底层了,有兴趣的同学可以自行阅读,篇幅有限本文不再分析了,总之按flink的标准来实现就好了。

04 总结

最终,整理后的流程如下:

4.1 前端

step1:【dlink-web模块】:dinky-web/src/components/Studio/StudioMenu/index.tsx#submit

  • 描述:提交接口 ,/api/task/submit

4.2 管理端

step1: 【dlink-admin模块】:com.dlink.controller.TaskController#submit

  • 描述:后端controller提交接口

step2: 【dlink-admin模块】:com.dlink.service.impl.TaskServiceImpl#submitTask

  • 描述:提交作业服务

step3: 【dlink-core模块】:com.dlink.job.JobManager#executeJar

  • 描述:作业管理器提交jar作业

step4: 【dlink-gateway模块】:com.dlink.gateway.yarn.YarnApplicationGateway#submitJar

  • 描述:提交客户端提交

step5: 【flink-yarn_xxx源码】:org.apache.flink.yarn.YarnClusterDescriptor#deployApplicationCluster

  • 描述:Flink yarn客户端提交源码

4.3 yarn端

step1: 【dlink-app模块】:com.dlink.app.MainApp#main

  • 描述:执行jar包入口,所有的flink sql作业都在这里开始

step2: 【dlink-app-base模块】:com.dlink.app.flinksql.Submiter#submit

  • 描述:作业提交器,Executor的初始化,并执行

step3: 【dlink-executor模块】:com.dlink.executor.AppStreamExecutor

  • 描述:作业执行器,初始化TableEnvironment,并执行

step4: 【dlink-client模块】:com.dlink.executor.CustomTableEnvironmentImpl

  • 描述:自定义TableEnvironment,实际执行flink sql的逻辑,再进一步就是flink的底层了。

05 文末

本文主要讲解了Dinky的一些概念,以及剖析了它的源码,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
253
分享
相关文章
kde
|
4天前
|
Docker镜像加速指南:手把手教你配置国内镜像源
配置国内镜像源可大幅提升 Docker 拉取速度,解决访问 Docker Hub 缓慢问题。本文详解 Linux、Docker Desktop 配置方法,并提供测速对比与常见问题解答,附最新可用镜像源列表,助力高效开发部署。
kde
2809 7
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
国内如何安装和使用 Claude Code镜像教程 - Windows 用户篇
531 0
Dify MCP 保姆级教程来了!
大语言模型,例如 DeepSeek,如果不能联网、不能操作外部工具,只能是聊天机器人。除了聊天没什么可做的。
773 7
2025年最新版最细致Maven安装与配置指南(任何版本都可以依据本文章配置)
本文详细介绍了Maven的项目管理工具特性、安装步骤和配置方法。主要内容包括: Maven概述:解释Maven作为基于POM的构建工具,具备依赖管理、构建生命周期和仓库管理等功能。 安装步骤: 从官网下载最新版本 解压到指定目录 创建本地仓库文件夹 关键配置: 修改settings.xml文件 配置阿里云和清华大学镜像仓库以加速依赖下载 设置本地仓库路径 附加说明:包含详细的配置示例和截图指导,适用于各种操作系统环境。 本文提供了完整的Maven安装和配置
2025年最新版最细致Maven安装与配置指南(任何版本都可以依据本文章配置)
【保姆级图文详解】大模型、Spring AI编程调用大模型
【保姆级图文详解】大模型、Spring AI编程调用大模型
305 7
【保姆级图文详解】大模型、Spring AI编程调用大模型
Excel数据治理新思路:引入智能体实现自动纠错【Python+Agent】
本文介绍如何利用智能体与Python代码批量处理Excel中的脏数据,解决人工录入导致的格式混乱、逻辑错误等问题。通过构建具备数据校验、异常标记及自动修正功能的系统,将数小时的人工核查任务缩短至分钟级,大幅提升数据一致性和办公效率。
DeepSeek R1+Open WebUI实现本地知识库的搭建和局域网访问
本文介绍了使用 DeepSeek R1 和 Open WebUI 搭建本地知识库的详细步骤与注意事项,涵盖核心组件介绍、硬件与软件准备、模型部署、知识库构建及问答功能实现等内容,适用于本地文档存储、向量化与检索增强生成(RAG)场景的应用开发。
350 0
让AI时代的卓越架构触手可及,阿里云技术解决方案开放免费试用
阿里云推出基于场景的解决方案免费试用活动,新老用户均可领取100点试用点,完成部署还可再领最高100点,相当于一年可获得最高200元云资源。覆盖AI、大数据、互联网应用开发等多个领域,支持热门场景如DeepSeek部署、模型微调等,助力企业和开发者快速验证方案并上云。
295 22
让AI时代的卓越架构触手可及,阿里云技术解决方案开放免费试用
FLUX.1 Kontext 的全生态教程来啦!AIGC专区在线试玩!
Flux.1 Kontext [dev] 开源模型大家都用上了吗?小编汇总了3个使用教程,打包送上!
398 1

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问