数据集成框架FlinkX(纯钧)入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 数据集成框架FlinkX(纯钧)入门

01 FlinkX为何物?

官网地址:https://dtstack.github.io/chunjun/

Githubhttps://github.com/DTStack/chunjun

FlinkX现改名为纯钧 ,它其实就是一款基于Flink 实现 多种异构数据源 之间的数据同步与计算,且 支持流批一体 的开源数据集成框架</u?。


FlinkX将不同的数据库抽象成了 reader/source 插件,writer/sink 插件和lookup 维表插件,它具有以下特点:

  • 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
  • 支持分布式运行,支持flink-standaloneyarn-sessionyarn-per job等多种提交方式;
  • 支持Docker一键部署,支持K8S 部署运行;
  • 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
  • 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
  • 不仅仅支持全量同步,还支持增量同步、间隔轮训;
  • 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
  • 支持脏数据存储,并提供指标监控等;
  • 配合checkpoint实现断点续传;
  • 不仅仅支持同步DML数据,还支持Schema变更同步。

其实它的基于json模板配置有点像DataX,之前博主也写过关于DataX的教程,有兴趣的同学可以参阅《DataX专栏》

ok,接下来,根据教程使用Chunjun来实现MySQL同步到MySQL的功能。

02 使用FlinkX实现MySQL同步至MySQL

2.1 源码编译

首先,我们先clone 纯钧的源码,为了提高下载速度,可以直接在clone gitee的仓库:https://gitee.com/dtstack_dev_0/chunjun.git

clone完成后,可以看到Chunjun的目录结构如下(已备注):

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

编译前提:JDK和MAVEN,此处不再详述。

执行编译命令:

mvn clean package -DskipTests

编译成功:

可以在项目目录下发现所有的资源在output目录:

我们解压看看这个目录有什么内容:

├── bin
│   ├── chunjun-docker.sh
│   ├── chunjun-kubernetes-application.sh
│   ├── chunjun-kubernetes-session.sh
│   ├── chunjun-local.sh
│   ├── chunjun-standalone.sh
│   ├── chunjun-yarn-perjob.sh
│   ├── chunjun-yarn-session.sh
│   ├── start-chunjun
│   └── submit.sh
├── chunjun-dist
│   ├── chunjun-core.jar
│   ├── connector
│   ├── ddl
│   ├── dirty-data-collector
│   ├── docker-build
│   ├── metrics
│   └── restore-plugins
├── chunjun-examples
│   ├── json
│   └── sql
└── lib
    ├── chunjun-clients.jar
    ├── log4j-1.2-api-2.19.0.jar
    ├── log4j-api-2.19.0.jar
    ├── log4j-core-2.19.0.jar
    └── log4j-slf4j-impl-2.19.0.jar

ok,有了以上的内容,可以提交一个任务了。任务提交类型有:local(默认值),standaloneyarn-sessionyarn-per-jobkubernetes-sessionkubernetes-application,具体可以参考ClusterMode类:

那接下来使用local模式来提交。

2.2 提交任务

我们可以直接修改的内容来跑例子(位置“解压目录/chunjun-examples/json/mysql”):

直接修改“mysql_mysql_realtime.json”里面的内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:32306/test?useSSL=false"
                ],
                "table": [
                  "t_user"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:32306/test?useSSL=false",
                "table": [
                  "t_user_copy"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "uniqueKey": ["id"],
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

提交作业:

cd 安装目录/bin
sh chunjun-local.sh -job ../chunjun-examples/json/mysql/mysql_mysql_realtime.json

启动时,会打印:

启动结束后(是不是感觉和DataX很像):

可以看到,已经同步t_user表的数据至t_user_copy表:

03 文末

ok,本文主要简单讲解了FlinkX的简单使用,后续会继续讲解它的原理以及源码。希望能帮助到大家,谢谢大家的阅读,本文完。

目录
相关文章
|
3天前
|
人工智能 达摩院 并行计算
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
VideoRefer 是浙江大学与阿里达摩学院联合推出的视频对象感知与推理技术,支持细粒度视频对象理解、复杂关系分析及多模态交互,适用于视频剪辑、教育、安防等多个领域。
49 17
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
|
1月前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
33 3
|
2月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
49 2
|
3月前
|
Java 程序员 API
Android|集成 slf4j + logback 作为日志框架
做个简单改造,统一 Android APP 和 Java 后端项目打印日志的体验。
155 1
|
4月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
743 2
Flink CDC:新一代实时数据集成框架
|
3月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
233 0
|
4月前
|
监控 Devops 测试技术
DevOps实践: 持续集成和持续部署(CI/CD)的入门指南
【9月更文挑战第10天】在快速迭代的软件开发世界中,DevOps已经成为加速产品交付、提升软件质量和团队协作的关键策略。本文将深入浅出地介绍DevOps的核心组成部分——持续集成(Continuous Integration, CI)与持续部署(Continuous Deployment, CD)的基本概念、实施步骤以及它们如何革新传统的软件开发流程。你将学习到如何通过自动化工具简化开发流程,并理解为什么CI/CD是现代软件开发不可或缺的一环。
|
5月前
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
75 0
|
5月前
|
测试技术 持续交付 开发者
Xamarin 高效移动应用测试最佳实践大揭秘,从框架选择到持续集成,让你的应用质量无敌!
【8月更文挑战第31天】竞争激烈的移动应用市场,Xamarin 作为一款优秀的跨平台开发工具,提供了包括单元测试、集成测试及 UI 测试在内的全面测试方案。借助 Xamarin.UITest 框架,开发者能便捷地用 C# 编写测试案例,如登录功能测试;通过 Xamarin 模拟框架,则可在无需真实设备的情况下模拟各种环境测试应用表现;Xamarin.TestCloud 则支持在真实设备上执行自动化测试,确保应用兼容性。结合持续集成与部署策略,进一步提升测试效率与应用质量。掌握 Xamarin 的测试最佳实践,对确保应用稳定性和优化用户体验至关重要。
75 0
|
5月前
|
数据库 Java 数据库连接
Struts 2 与 Hibernate 的完美邂逅:如何无缝集成两大框架,轻松玩转高效 CRUD 操作?
【8月更文挑战第31天】本文通过具体示例介绍了如何在 Struts 2 中整合 Hibernate,实现基本的 CRUD 操作。首先创建 Maven 项目并添加相关依赖,接着配置 Hibernate 并定义实体类及其映射文件。然后创建 DAO 接口及实现类处理数据库操作,再通过 Struts 2 的 Action 类处理用户请求。最后配置 `struts.xml` 文件并创建 JSP 页面展示用户列表及编辑表单。此示例展示了如何配置和使用这两个框架,使代码更加模块化和可维护。
179 0