深度剖析Dinky源码(上)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 深度剖析Dinky源码

01 引言

最近刚好看到一款 Flink 二次开发的开源框架 Dinky,简单看了其官网的描述,似乎功能很全,经过拆分项目,最终本地跑起来了,运行如下:

前端 后端

浏览器登录之后如下:

使用了一阵,发现了有不少的bug,点击几下就报错了,但是问题不大,本着学习的态度,主要学习其核心的功能点,本文以Dinky任务提交的流程分析一下其源码。

02 目录结构

参考的文档:http://www.dlink.top/docs/0.7/developer_guide/local_debug

经过整理之后,Dinky后端的目录结构如下:

├ dlink 父项目
├── dlink-admin 【管理中心】:标准的 SpringBoot 应用,负责与前端 react 交互
├── dlink-alert 【告警中心】:集成钉钉 、企业微信 、飞书 、邮箱
│   ├── dlink-alert-base
│   ├── dlink-alert-dingtalk
│   ├── dlink-alert-email
│   ├── dlink-alert-feishu
├── dlink-app 【flink entrypoint jar】:Dlink 在 Yarn Application 模式所使用的简化解析包
│   ├── dlink-app-1.11
│   ├── dlink-app-1.12
│   ├── dlink-app-1.13
│   ├── dlink-app-1.14
│   ├── dlink-app-1.15
│   ├── dlink-app-1.16
│   ├── dlink-app-1.17
│   └── dlink-app-base
├── dlink-assembly 【打包配置】:最终 tar.gz 的打包内容
├── dlink-catalog
│   └── dlink-catalog-mysql
│       ├── dlink-catalog-mysql-1.13
│       ├── dlink-catalog-mysql-1.14
│       ├── dlink-catalog-mysql-1.15
│       ├── dlink-catalog-mysql-1.16
│       └── dlink-catalog-mysql-1.17
├── dlink-client  【Client中心】用来桥接 Dlink 与不同版本的 Flink 运行环境
│   ├── dlink-client-1.11
│   ├── dlink-client-1.12
│   ├── dlink-client-1.13
│   ├── dlink-client-1.14
│   ├── dlink-client-1.15
│   ├── dlink-client-1.16
│   ├── dlink-client-1.17
│   ├── dlink-client-base
│   └── dlink-client-hadoop
├── dlink-common 【公用模块】
├── dlink-connectors 【自定义connector】
│   ├── dlink-connector-doris-1.13
│   ├── dlink-connector-jdbc-1.11
│   ├── dlink-connector-jdbc-1.12
│   ├── dlink-connector-jdbc-1.13
│   ├── dlink-connector-jdbc-1.14
│   ├── dlink-connector-phoenix-1.13
│   ├── dlink-connector-phoenix-1.14
│   └── dlink-connector-pulsar-1.14
├── dlink-core 【执行中心】
├── dlink-daemon 【任务监听线程】
├── dlink-doc 【打包资源】:包含启动脚本、sql脚本、配置文件等
├── dlink-executor 【执行模块】:从 dlink-core 中拆分出来,内含最核心的 Executor、Interceptor、Operation 等实现
├── dlink-flink 【flink运行时依赖的环境】
│   ├── dlink-flink-1.11
│   ├── dlink-flink-1.12
│   ├── dlink-flink-1.13
│   ├── dlink-flink-1.14
│   ├── dlink-flink-1.15
│   ├── dlink-flink-1.16
│   └── dlink-flink-1.17
├── dlink-function 【自定义函数】
├── dlink-gateway【任务网关】:负责把实现不同执行模式的任务提交与管理,目前主要包含 Yarn PerJob 和 Application
├── dlink-metadata 【元数据中心】:用于实现各种外部数据源对接到 Dlink,以此使用其各种查询、执行等能力,未来用于 Flink Catalog 的预装载等。
│   ├── dlink-metadata-base
│   ├── dlink-metadata-clickhouse
│   ├── dlink-metadata-doris
│   ├── dlink-metadata-hive
│   ├── dlink-metadata-mysql
│   ├── dlink-metadata-oracle
│   ├── dlink-metadata-phoenix
│   ├── dlink-metadata-postgresql
│   ├── dlink-metadata-presto
│   ├── dlink-metadata-sqlserver
│   └── dlink-metadata-starrocks
├── dlink-process 【流程管理?】
├── dlink-scheduler 【应该是对接DolphinScheduler的】
├── docker 【相关dockerfile脚本】
│   ├── mysql
│   ├── server
│   └── web
├── docs 【相关文档】
└── dlink-web 【dlink前端,React框架】

在了解完目录结构后,接下来可以进入本文的核心部分,一起去分析Dinky的作业提交流程。

03 源码分析

http://www.dlink.top/docs/0.7/developer_guide/local_debug

在进行源码分析前,有必要看看官网贴出的,在不同执行模式下,任务提交的流程。这里复制官网所说的任务执行路线:

Local模式

同步执行/异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> LocalStreamExecutor 
==> CustomTableEnvironmentImpl 
==> LocalEnvironment

Standalone模式

注册集群实例 
==> 同步执行/异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> RemoteStreamExecutor 
==> CustomTableEnvironmentImpl 
==> RemoteEnvironment 
==> JobGraph 
==> Flink Standalone Cluster

Yarn Session模式

注册集群实例 
==> 同步执行/异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> RemoteStreamExecutor 
==> CustomTableEnvironmentImpl 
==> RemoteEnvironment 
==> JobGraph 
==> Flink Yarn Session Cluster

Yarn Per-Job模式

注册集群配置 
==> 异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> JobGraph 
==> Gateway 
==> YarnPerJobGateway
==> YarnClient 
==> Flink Yarn Per-Job Cluster

Yarn Application模式

注册集群配置 
==> 异步提交 
==> StudioService 
==> JobManager 
==> Executor 
==> TaskId & JDBC 
==> Gateway 
==> YarnApplicationGateway
==> YarnClient 
==> dlink-app.jar 
==> Executor 
==> AppStreamExecutor 
==> CustomTableEnvironmentImpl 
==> LocalEnvironmentFlink Yarn Application Cluster

Yarn Application 模式是我们日常开发中用的最多的,我们以此为例子,看看整个流程。

3.1 前端

前端的操作流程大致如下,图片里已有描述:

我们最终需要知道的是提交的内容,浏览器右键的cURL内容如下:

curl 'http://localhost:8000/api/task/submit' \
  -H 'Accept: application/json' \
  -H 'Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' \
  -H 'Connection: keep-alive' \
  -H 'Content-Type: application/json;charset=UTF-8' \
  -H 'Cookie: tenantId=1; satoken=8733fc5d-c12b-497a-9e56-ad1991e2eef3' \
  -H 'Origin: http://localhost:8000' \
  -H 'Referer: http://localhost:8000/' \
  -H 'Sec-Fetch-Dest: empty' \
  -H 'Sec-Fetch-Mode: cors' \
  -H 'Sec-Fetch-Site: same-origin' \
  -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' \
  -H 'sec-ch-ua: "Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"' \
  -H 'sec-ch-ua-mobile: ?0' \
  -H 'sec-ch-ua-platform: "macOS"' \
  -H 'tenantId: 1' \
  --data-raw '{"0":2}' \
  --compressed

3.2 管理端

3.2.1 submitTask (提交的接口)

所属模块dlink-admin

所属类#方法com.dlink.controller.TaskController#submit


代码如下(含注释):

/**
 * 任务提交接口
 *
 * @param para 提交参数(任务ID数组)
 * @return 提交结果
 * @author : YangLinWei
 * @createTime: 2023/7/14 16:24
 */
@PostMapping(value = "/submit")
public Result submit(@RequestBody JsonNode para) throws Exception {
    if (para.size() > 0) {
        List<JobResult> results = new ArrayList<>();
        List<Integer> error = new ArrayList<>();
        for (final JsonNode item : para) {
            Integer id = item.asInt();
            // 提交任务
            JobResult result = taskService.submitTask(id);
            if (!result.isSuccess()) {
                error.add(id);
            }
            results.add(result);
        }
        if (error.size() == 0) {
            return Result.succeed(results, "执行成功");
        } else {
            return Result.succeed(results, "执行部分成功,但" + error.toString() + "执行失败,共" + error.size() + "次失败。");
        }
    } else {
        return Result.failed("请选择要执行的记录");
    }
}

3.2.2 submitTask(提交的主体流程)

所属模块dlink-admin

所属类#方法com.dlink.service.impl.TaskServiceImpl#submitTask


可以看到里面使用taskService来提交任务,继续进入com.dlink.service.impl.TaskServiceImpl#submitTask,可以看到代码如下(已写注释):

/**
 * 提交作业任务
 *
 * @param id 任务ID
 * @return 任务结果
 * @author : YangLinWei
 * @createTime: 2023/7/14 16:24
 */
@Override
public JobResult submitTask(Integer id) {
    // 1. 获取任务信息
    Task task = this.getTaskInfoById(id);
    Asserts.checkNull(task, Tips.TASK_NOT_EXIST);
    // 2. 判断是否为flink任务
    if (Dialect.notFlinkSql(task.getDialect())) {
        // 2.1 如果不是flink任务,执行普通的sql(本文不分析)
        return executeCommonSql(SqlDTO.build(task.getStatement(),
                task.getDatabaseId(), null));
    }
    // 3. 初始化进程实例(主要用作控制台显示日志)
    ProcessEntity process = null;
    if (StpUtil.isLogin()) {
        // 3.1 如果用户已经登录,设置process进入ProcessContextHolder上下文,下次用户登录进来就可以继续看日志了
        process = ProcessContextHolder.registerProcess(
                ProcessEntity.init(ProcessType.FLINKSUBMIT, StpUtil.getLoginIdAsInt()));
    } else {
        // 3.2 如果没登录,则则直接打印
        process = ProcessEntity.NULL_PROCESS;
    }
    // 4. 根据任务,配置flink 作业配置
    process.info("Initializing Flink job config...");
    JobConfig config = buildJobConfig(task);
    // 5. 判断执行模式是否为 kubernetes-application
    if (GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) {
        // 5.1 构建并加载Docker镜像
        loadDocker(id, config.getClusterConfigurationId(), config.getGatewayConfig());
    }
    // 6. 构建JobManager
    JobManager jobManager = JobManager.build(config);
    // 7. 执行流程记录(start)
    process.start();
    if (!config.isJarTask()) {
        // 如果不是jar任务,JobManager 提交执行flink sql
        JobResult jobResult = jobManager.executeSql(task.getStatement());
        process.finish("Submit Flink SQL finished.");
        return jobResult;
    } else {
        // 如果是jar任务,JobManager 提交执行执行jar
        JobResult jobResult = jobManager.executeJar();
        process.finish("Submit Flink Jar finished.");
        return jobResult;
    }
}

上述的代码不多,但是有很多逻辑,我们主要分析flink jar 的流程,其实可以精简的理解为:

==> getTaskInfoById(taskId):查询任务的配置,以及关联的作业实例
==> buildJobConfig(Task task):根据任务信息构造flink作业配置(例如:执行jar的路径、savepoint配置等,也就是提交界面右侧的作业配置)
==> JobManager.build(config):根据作业配置,初始化JobManager,主要是初始化了提交作业的GateWay配置、初始化JobHandler(Job2MysqlHandler)、构造Executor(LocalStreamExecutor)
==> jobManager.executeJar():执行jar作业

从上述的流程描述,可以看到执行jar作业的核心代码在:com.dlink.job.JobManager#executeJar

3.2.3 executeJar (执行jar作业)

所属模块dlink-core

所属类#方法com.dlink.job.JobManager#executeJar


执行作业的代码流程如下:

/**
 * 执行jar作业
 *
 * @return 作业提交结果
 * @author : YangLinWei
 * @createTime: 2023/7/14 16:24
 */
public JobResult executeJar() {
    // 从上下文获取流程实例(主要用作记录打印)
    ProcessEntity process = ProcessContextHolder.getProcess();
    // 初始化作业配置
    Job job = Job.init(runMode, config, executorSetting, executor, null, useGateway);
    // 把当前作业设置进上下文
    JobContextHolder.setJob(job);
    // 准备提交的工作(主要是执行JobHandler的init方法)
    ready();
    try {
        // 构造GateWay(YarnApplicationGateWay),并提交Jar
        GatewayResult gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar();
        // 下面的逻辑便是设置作业提交后的结果,并返回给前端
        job.setResult(InsertResult.success(gatewayResult.getAppId()));
        job.setJobId(gatewayResult.getAppId());
        job.setJids(gatewayResult.getJids());
        job.setJobManagerAddress(formatAddress(gatewayResult.getWebURL()));
        job.setEndTime(LocalDateTime.now());
        if (gatewayResult.isSucess()) {
            job.setStatus(Job.JobStatus.SUCCESS);
            success();
        } else {
            job.setError(gatewayResult.getError());
            job.setStatus(Job.JobStatus.FAILED);
            failed();
        }
    } catch (Exception e) {
        String error = LogUtil.getError(
                "Exception in executing Jar:\n" + config.getGatewayConfig().getAppConfig().getUserJarPath(), e);
        job.setEndTime(LocalDateTime.now());
        job.setStatus(Job.JobStatus.FAILED);
        job.setError(error);
        failed();
        process.error(error);
    } finally {
        close();
    }
    return job.getJobResult();
}

从上述的代码可以看到,用到了两个核心的类,分别是JobHandlerGateWay

3.3.3 JobHandler (作业处理器)

所属模块dlink-core

所属类#方法com.dlink.job.JobHandler#ready()


从上述代码,可以知道上面的JobHandlerJob2MysqlHandler实现)记录了执行的流程,依次如下:

ready() => success()/failed()

其中接口如下:

/**
 * 作业生命周期处理器
 *
 * @author : YangLinWei
 * @createTime: 2023/7/14 23:31
 * @version: 1.0.0
 */
public interface JobHandler {
    /**
     * 初始化(记录作业历史等)
     */
    boolean init();
    /**
     * 准备提交前出发的操作
目录
相关文章
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
708 3
|
数据采集 分布式计算 Hadoop
开源数据质量解决方案——Apache Griffin入门宝典(上)
开源数据质量解决方案——Apache Griffin入门宝典
2374 0
|
10月前
|
SQL API Apache
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
1182 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
4868 3
Flink CDC:新一代实时数据集成框架
|
运维 关系型数据库 MySQL
安装MySQL8数据库
本文介绍了MySQL的不同版本及其特点,并详细描述了如何通过Yum源安装MySQL 8.4社区版,包括配置Yum源、安装MySQL、启动服务、设置开机自启动、修改root用户密码以及设置远程登录等步骤。最后还提供了测试连接的方法。适用于初学者和运维人员。
905 0
|
SQL 资源调度 前端开发
深度剖析Dinky源码(下)
深度剖析Dinky源码(下)
653 0
|
SQL 存储 关系型数据库
|
SQL 前端开发 Java
迄今为止最好用的Flink SQL教程:Flink SQL Cookbook on Zeppelin
无需写任何代码,只要照着这篇文章轻松几步就能跑各种类型的 Flink SQL 语句。
迄今为止最好用的Flink SQL教程:Flink SQL Cookbook on Zeppelin
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用合集之通过scan.incremental.snapshot.chunk.key-column参数配置来处理无主键表的全量同步,增量数据进不来的原因是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
域名解析 分布式计算 资源调度
在文件存储 HDFS 上使用 Apache Flink
本文主要为大家介绍在文件存储HDFS上搭建及使用Apache Flink的方法。
2096 0
在文件存储 HDFS 上使用 Apache Flink