数据集成框架FlinkX(纯钧)入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 数据集成框架FlinkX(纯钧)入门

01 FlinkX为何物?

官网地址:https://dtstack.github.io/chunjun/

Githubhttps://github.com/DTStack/chunjun

FlinkX现改名为纯钧 ,它其实就是一款基于Flink 实现 多种异构数据源 之间的数据同步与计算,且 支持流批一体 的开源数据集成框架</u?。


FlinkX将不同的数据库抽象成了 reader/source 插件,writer/sink 插件和lookup 维表插件,它具有以下特点:

  • 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
  • 支持分布式运行,支持flink-standaloneyarn-sessionyarn-per job等多种提交方式;
  • 支持Docker一键部署,支持K8S 部署运行;
  • 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
  • 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
  • 不仅仅支持全量同步,还支持增量同步、间隔轮训;
  • 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
  • 支持脏数据存储,并提供指标监控等;
  • 配合checkpoint实现断点续传;
  • 不仅仅支持同步DML数据,还支持Schema变更同步。

其实它的基于json模板配置有点像DataX,之前博主也写过关于DataX的教程,有兴趣的同学可以参阅《DataX专栏》

ok,接下来,根据教程使用Chunjun来实现MySQL同步到MySQL的功能。

02 使用FlinkX实现MySQL同步至MySQL

2.1 源码编译

首先,我们先clone 纯钧的源码,为了提高下载速度,可以直接在clone gitee的仓库:https://gitee.com/dtstack_dev_0/chunjun.git

clone完成后,可以看到Chunjun的目录结构如下(已备注):

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

编译前提:JDK和MAVEN,此处不再详述。

执行编译命令:

mvn clean package -DskipTests

编译成功:

可以在项目目录下发现所有的资源在output目录:

我们解压看看这个目录有什么内容:

├── bin
│   ├── chunjun-docker.sh
│   ├── chunjun-kubernetes-application.sh
│   ├── chunjun-kubernetes-session.sh
│   ├── chunjun-local.sh
│   ├── chunjun-standalone.sh
│   ├── chunjun-yarn-perjob.sh
│   ├── chunjun-yarn-session.sh
│   ├── start-chunjun
│   └── submit.sh
├── chunjun-dist
│   ├── chunjun-core.jar
│   ├── connector
│   ├── ddl
│   ├── dirty-data-collector
│   ├── docker-build
│   ├── metrics
│   └── restore-plugins
├── chunjun-examples
│   ├── json
│   └── sql
└── lib
    ├── chunjun-clients.jar
    ├── log4j-1.2-api-2.19.0.jar
    ├── log4j-api-2.19.0.jar
    ├── log4j-core-2.19.0.jar
    └── log4j-slf4j-impl-2.19.0.jar

ok,有了以上的内容,可以提交一个任务了。任务提交类型有:local(默认值),standaloneyarn-sessionyarn-per-jobkubernetes-sessionkubernetes-application,具体可以参考ClusterMode类:

那接下来使用local模式来提交。

2.2 提交任务

我们可以直接修改的内容来跑例子(位置“解压目录/chunjun-examples/json/mysql”):

直接修改“mysql_mysql_realtime.json”里面的内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:32306/test?useSSL=false"
                ],
                "table": [
                  "t_user"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:32306/test?useSSL=false",
                "table": [
                  "t_user_copy"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "uniqueKey": ["id"],
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

提交作业:

cd 安装目录/bin
sh chunjun-local.sh -job ../chunjun-examples/json/mysql/mysql_mysql_realtime.json

启动时,会打印:

启动结束后(是不是感觉和DataX很像):

可以看到,已经同步t_user表的数据至t_user_copy表:

03 文末

ok,本文主要简单讲解了FlinkX的简单使用,后续会继续讲解它的原理以及源码。希望能帮助到大家,谢谢大家的阅读,本文完。

目录
相关文章
|
5月前
|
API Apache 数据库
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 于 2023 年 12 月 7 日重磅推出了其全新的 3.0 版本 ~
100434 7
 Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
|
5月前
|
Java 测试技术 数据库连接
秒级启动的集成测试框架
秒级启动的集成测试框架
|
4天前
|
Java Spring
Spring Boot脚手架集成校验框架
Spring Boot脚手架集成校验框架
11 0
|
12天前
|
监控 测试技术 数据安全/隐私保护
如何将代理IP集成到自动化测试框架中?
如何将代理IP集成到自动化测试框架中?
|
2月前
|
SQL API 数据处理
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
本文整理自阿里云开源大数据平台吕宴全关于新一代实时数据集成框架 Flink CDC 3.0 的核心技术架构解析。
749 0
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
|
4月前
|
机器学习/深度学习 PyTorch TensorFlow
iOS设备功能和框架: 什么是 Core ML?如何在应用中集成机器学习模型?
iOS设备功能和框架: 什么是 Core ML?如何在应用中集成机器学习模型?
28 0
|
4月前
|
API Apache 数据库
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
218 0
|
5月前
|
流计算
电子好书发您分享《Flink CDC:新一代数据集成框架》
电子好书发您分享《Flink CDC:新一代数据集成框架》
153 2
|
5月前
|
SQL druid Java
三步实现maven工程集成logback日志框架(日志按天滚动生成文件)并附源码
三步实现maven工程集成logback日志框架(日志按天滚动生成文件)并附源码
83 0
|
5月前
|
流计算
电子好书分享《Flink CDC:新一代数据集成框架》
电子好书分享《Flink CDC:新一代数据集成框架》
314 1
电子好书分享《Flink CDC:新一代数据集成框架》