01 FlinkX为何物?
FlinkX现改名为纯钧 ,它其实就是一款基于Flink 实现 多种异构数据源 之间的数据同步与计算,且 支持流批一体 的开源数据集成框架</u?。
FlinkX将不同的数据库抽象成了 reader/source
插件,writer/sink
插件和lookup
维表插件,它具有以下特点:
- 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
- 支持分布式运行,支持
flink-standalone
、yarn-session
、yarn-per job
等多种提交方式; - 支持
Docker
一键部署,支持K8S
部署运行; - 支持多种异构数据源,可支持
MySQL、Oracle、SQLServer、Hive、Kudu
等20多种数据源的同步与计算; - 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
- 不仅仅支持全量同步,还支持增量同步、间隔轮训;
- 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
- 支持脏数据存储,并提供指标监控等;
- 配合
checkpoint
实现断点续传; - 不仅仅支持同步
DML
数据,还支持Schema
变更同步。
其实它的基于json
模板配置有点像DataX
,之前博主也写过关于DataX
的教程,有兴趣的同学可以参阅《DataX专栏》。
ok,接下来,根据教程使用Chunjun来实现MySQL同步到MySQL的功能。
02 使用FlinkX实现MySQL同步至MySQL
2.1 源码编译
首先,我们先clone 纯钧的源码,为了提高下载速度,可以直接在clone gitee的仓库:https://gitee.com/dtstack_dev_0/chunjun.git。
clone
完成后,可以看到Chunjun
的目录结构如下(已备注):
- bin # 存放执行脚本的目录 ├── chunjun-docker.sh # Docker 启动脚本 ├── chunjun-kubernetes-application.sh # Kubernetes 应用模式启动脚本 ├── chunjun-kubernetes-session.sh # Kubernetes 会话模式启动脚本 ├── chunjun-local.sh # 本地启动脚本 ├── chunjun-standalone.sh # 单机模式启动脚本 ├── chunjun-yarn-perjob.sh # YARN 每作业模式启动脚本 ├── chunjun-yarn-session.sh # YARN 会话模式启动脚本 ├── start-chunjun # 通用启动脚本 └── submit.sh # 提交任务脚本 - build # 构建脚本目录 └── build.sh # 构建脚本 - chunjun-assembly # 汇总装配模块目录 - chunjun-clients # 客户端模块目录 - chunjun-connectors # 连接器模块目录 ├── (多个子目录) # 不同的数据连接器子模块 - chunjun-core # 核心模块目录 - chunjun-ddl # 数据定义语言模块目录 ├── chunjun-ddl-base # DDL 基础模块 ├── chunjun-ddl-mysql # MySQL DDL 模块 ├── chunjun-ddl-oracle # Oracle DDL 模块 - chunjun-dev # 开发工具模块目录 ├── (多个子目录) # 包含开发用的各种工具和资源 - chunjun-dirty # 脏数据处理模块目录 ├── (多个子目录) # 不同的脏数据处理子模块 - chunjun-docker # Docker 相关模块目录 ├── (多个子目录) # Docker 相关资源和配置 - chunjun-e2e # 端到端测试模块目录 - chunjun-examples # 示例模块目录 ├── json # JSON 示例 └── sql # SQL 示例 - chunjun-local-test # 本地测试模块目录 - chunjun-metrics # 指标监控模块目录 ├── (多个子目录) # 包含不同的监控模块 - chunjun-restore # 数据恢复模块目录 ├── chunjun-restore-common # 通用数据恢复模块 └── chunjun-restore-mysql # MySQL 数据恢复模块
编译前提:JDK和MAVEN,此处不再详述。
执行编译命令:
mvn clean package -DskipTests
编译成功:
可以在项目目录下发现所有的资源在output目录:
我们解压看看这个目录有什么内容:
├── bin │ ├── chunjun-docker.sh │ ├── chunjun-kubernetes-application.sh │ ├── chunjun-kubernetes-session.sh │ ├── chunjun-local.sh │ ├── chunjun-standalone.sh │ ├── chunjun-yarn-perjob.sh │ ├── chunjun-yarn-session.sh │ ├── start-chunjun │ └── submit.sh ├── chunjun-dist │ ├── chunjun-core.jar │ ├── connector │ ├── ddl │ ├── dirty-data-collector │ ├── docker-build │ ├── metrics │ └── restore-plugins ├── chunjun-examples │ ├── json │ └── sql └── lib ├── chunjun-clients.jar ├── log4j-1.2-api-2.19.0.jar ├── log4j-api-2.19.0.jar ├── log4j-core-2.19.0.jar └── log4j-slf4j-impl-2.19.0.jar
ok,有了以上的内容,可以提交一个任务了。任务提交类型有:local
(默认值),standalone
,yarn-session
,yarn-per-job
,kubernetes-session
,kubernetes-application
,具体可以参考ClusterMode类:
那接下来使用local
模式来提交。
2.2 提交任务
我们可以直接修改的内容来跑例子(位置“解压目录/chunjun-examples/json/mysql”):
直接修改“mysql_mysql_realtime.json”里面的内容:
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "age", "type": "int" } ], "customSql": "", "where": "id < 1000", "splitPk": "id", "startLocation": "2", "polling": true, "pollingInterval": 3000, "queryTimeOut": 1000, "username": "root", "password": "root", "connection": [ { "jdbcUrl": [ "jdbc:mysql://127.0.0.1:32306/test?useSSL=false" ], "table": [ "t_user" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "root", "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:32306/test?useSSL=false", "table": [ "t_user_copy" ] } ], "writeMode": "insert", "flushIntervalMills":"3000", "uniqueKey": ["id"], "column": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "age", "type": "int" } ] } } } ], "setting": { "restore": { "restoreColumnName": "id" }, "speed": { "channel": 1, "bytes": 0 } } } }
提交作业:
cd 安装目录/bin sh chunjun-local.sh -job ../chunjun-examples/json/mysql/mysql_mysql_realtime.json
启动时,会打印:
启动结束后(是不是感觉和DataX很像):
可以看到,已经同步t_user表的数据至t_user_copy表:
03 文末
ok,本文主要简单讲解了FlinkX的简单使用,后续会继续讲解它的原理以及源码。希望能帮助到大家,谢谢大家的阅读,本文完。