数据集成框架FlinkX(纯钧)入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: 数据集成框架FlinkX(纯钧)入门

01 FlinkX为何物?

官网地址:https://dtstack.github.io/chunjun/

Githubhttps://github.com/DTStack/chunjun

FlinkX现改名为纯钧 ,它其实就是一款基于Flink 实现 多种异构数据源 之间的数据同步与计算,且 支持流批一体 的开源数据集成框架</u?。


FlinkX将不同的数据库抽象成了 reader/source 插件,writer/sink 插件和lookup 维表插件,它具有以下特点:

  • 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
  • 支持分布式运行,支持flink-standaloneyarn-sessionyarn-per job等多种提交方式;
  • 支持Docker一键部署,支持K8S 部署运行;
  • 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
  • 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
  • 不仅仅支持全量同步,还支持增量同步、间隔轮训;
  • 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
  • 支持脏数据存储,并提供指标监控等;
  • 配合checkpoint实现断点续传;
  • 不仅仅支持同步DML数据,还支持Schema变更同步。

其实它的基于json模板配置有点像DataX,之前博主也写过关于DataX的教程,有兴趣的同学可以参阅《DataX专栏》

ok,接下来,根据教程使用Chunjun来实现MySQL同步到MySQL的功能。

02 使用FlinkX实现MySQL同步至MySQL

2.1 源码编译

首先,我们先clone 纯钧的源码,为了提高下载速度,可以直接在clone gitee的仓库:https://gitee.com/dtstack_dev_0/chunjun.git

clone完成后,可以看到Chunjun的目录结构如下(已备注):

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

编译前提:JDK和MAVEN,此处不再详述。

执行编译命令:

mvn clean package -DskipTests

编译成功:

可以在项目目录下发现所有的资源在output目录:

我们解压看看这个目录有什么内容:

├── bin
│   ├── chunjun-docker.sh
│   ├── chunjun-kubernetes-application.sh
│   ├── chunjun-kubernetes-session.sh
│   ├── chunjun-local.sh
│   ├── chunjun-standalone.sh
│   ├── chunjun-yarn-perjob.sh
│   ├── chunjun-yarn-session.sh
│   ├── start-chunjun
│   └── submit.sh
├── chunjun-dist
│   ├── chunjun-core.jar
│   ├── connector
│   ├── ddl
│   ├── dirty-data-collector
│   ├── docker-build
│   ├── metrics
│   └── restore-plugins
├── chunjun-examples
│   ├── json
│   └── sql
└── lib
    ├── chunjun-clients.jar
    ├── log4j-1.2-api-2.19.0.jar
    ├── log4j-api-2.19.0.jar
    ├── log4j-core-2.19.0.jar
    └── log4j-slf4j-impl-2.19.0.jar

ok,有了以上的内容,可以提交一个任务了。任务提交类型有:local(默认值),standaloneyarn-sessionyarn-per-jobkubernetes-sessionkubernetes-application,具体可以参考ClusterMode类:

那接下来使用local模式来提交。

2.2 提交任务

我们可以直接修改的内容来跑例子(位置“解压目录/chunjun-examples/json/mysql”):

直接修改“mysql_mysql_realtime.json”里面的内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:32306/test?useSSL=false"
                ],
                "table": [
                  "t_user"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:32306/test?useSSL=false",
                "table": [
                  "t_user_copy"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "uniqueKey": ["id"],
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

提交作业:

cd 安装目录/bin
sh chunjun-local.sh -job ../chunjun-examples/json/mysql/mysql_mysql_realtime.json

启动时,会打印:

启动结束后(是不是感觉和DataX很像):

可以看到,已经同步t_user表的数据至t_user_copy表:

03 文末

ok,本文主要简单讲解了FlinkX的简单使用,后续会继续讲解它的原理以及源码。希望能帮助到大家,谢谢大家的阅读,本文完。

目录
相关文章
|
5天前
|
监控 负载均衡 Java
Spring Boot与微服务治理框架的集成
Spring Boot与微服务治理框架的集成
|
5天前
|
存储 Java 数据中心
Spring Boot与微服务治理框架的集成成功案例
Spring Boot与微服务治理框架的集成成功案例
|
11天前
|
人工智能 搜索推荐 安全
移动应用开发的未来趋势:跨平台框架和AI集成
【6月更文挑战第26天】在移动应用开发的快速演变领域中,开发者面临着不断变化的挑战与机遇。本文将探讨未来移动应用开发的几个关键趋势,包括跨平台框架的兴起、人工智能(AI)技术的集成,以及这些技术如何影响应用的性能、安全性和用户体验。通过分析当前的技术进步,我们预见到移动应用将更加智能、响应迅速且无缝集成于用户日常生活中。
|
17天前
|
人工智能 开发框架 前端开发
探索移动应用开发的未来:从跨平台框架到人工智能集成
【6月更文挑战第20天】随着移动设备的普及,移动应用开发领域不断演进,涌现出多种创新技术和工具。本文将深入探讨跨平台开发框架的兴起、人工智能在移动应用中的集成以及未来移动操作系统的发展趋势。我们将分析Flutter和React Native等流行框架如何简化开发流程,同时考察AI技术如何提升用户体验。此外,文章还将预测移动操作系统的发展方向,为开发者提供前瞻性的见解和建议。
27 3
|
21天前
|
SQL Oracle 关系型数据库
一文入门Dataphin实时集成
Dataphin实时集成的读取和写入原理是什么?Dataphin实时集成和实时研发的区别是什么?Dataphin实时集成有哪些优势?本文一次讲清
|
5天前
|
负载均衡 监控 Java
Spring Boot与微服务治理框架的集成方法
Spring Boot与微服务治理框架的集成方法
|
13天前
|
边缘计算 Cloud Native IDE
“论SOA在企业集成架构设计中的应用”写作框架,系统架构设计师
企业应用集成(Enterprise Application Integration, EAI)是每个企业都必须要面对的实际问题。面向服务的企业应用集成是一种基于面向服务体系结构(Service-OrientedArchitecture,SOA)的新型企业应用集成技术,强调将企业和组织内部的资源和业务功能暴露为服务,实现资源共享和系统之间的互操作性,并支持快速地将新的应用以服务的形式加入到已有的集成环境中,增强企业IT环境的灵活性。
|
2月前
|
编解码 人工智能
DiT架构大一统:一个框架集成图像、视频、音频和3D生成,可编辑、能试玩
【5月更文挑战第23天】研究人员提出Lumina-T2X框架,统一生成和编辑图像、视频、音频及3D内容。使用Flow-based Large Diffusion Transformer (Flag-DiT)模型,实现多模态生成,支持内容编辑。尽管面临训练资源需求高、生成质量不及人类创作等问题,该框架在娱乐、广告等领域有广泛应用潜力。[论文链接](https://arxiv.org/pdf/2405.05945)
50 1
|
2月前
|
Dart 前端开发 测试技术
移动应用开发的未来:跨平台框架与原生系统的融合深入理解软件测试中的持续集成与持续部署(CI/CD)
【5月更文挑战第30天】 在本文中,我们将深入探讨移动应用开发领域的最新趋势:跨平台开发框架与原生操作系统的融合。随着移动设备成为日常生活的核心,高效、灵活且性能卓越的应用程序需求日益增长。文章分析了当前主流的跨平台工具如React Native和Flutter,并探讨了它们如何与iOS和Android等原生系统相互作用,以及这种融合对开发者、用户和整个移动生态系统意味着什么。我们还将预测未来可能的技术发展,并提出相应的策略建议。