数据集成框架FlinkX(纯钧)入门

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 数据集成框架FlinkX(纯钧)入门

01 FlinkX为何物?

官网地址:https://dtstack.github.io/chunjun/

Githubhttps://github.com/DTStack/chunjun

FlinkX现改名为纯钧 ,它其实就是一款基于Flink 实现 多种异构数据源 之间的数据同步与计算,且 支持流批一体 的开源数据集成框架</u?。


FlinkX将不同的数据库抽象成了 reader/source 插件,writer/sink 插件和lookup 维表插件,它具有以下特点:

  • 基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;
  • 支持分布式运行,支持flink-standaloneyarn-sessionyarn-per job等多种提交方式;
  • 支持Docker一键部署,支持K8S 部署运行;
  • 支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;
  • 易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;
  • 不仅仅支持全量同步,还支持增量同步、间隔轮训;
  • 批流一体,不仅仅支持离线同步及计算,还兼容实时场景;
  • 支持脏数据存储,并提供指标监控等;
  • 配合checkpoint实现断点续传;
  • 不仅仅支持同步DML数据,还支持Schema变更同步。

其实它的基于json模板配置有点像DataX,之前博主也写过关于DataX的教程,有兴趣的同学可以参阅《DataX专栏》

ok,接下来,根据教程使用Chunjun来实现MySQL同步到MySQL的功能。

02 使用FlinkX实现MySQL同步至MySQL

2.1 源码编译

首先,我们先clone 纯钧的源码,为了提高下载速度,可以直接在clone gitee的仓库:https://gitee.com/dtstack_dev_0/chunjun.git

clone完成后,可以看到Chunjun的目录结构如下(已备注):

- bin                                # 存放执行脚本的目录
  ├── chunjun-docker.sh                  # Docker 启动脚本
  ├── chunjun-kubernetes-application.sh  # Kubernetes 应用模式启动脚本
  ├── chunjun-kubernetes-session.sh      # Kubernetes 会话模式启动脚本
  ├── chunjun-local.sh                   # 本地启动脚本
  ├── chunjun-standalone.sh              # 单机模式启动脚本
  ├── chunjun-yarn-perjob.sh             # YARN 每作业模式启动脚本
  ├── chunjun-yarn-session.sh            # YARN 会话模式启动脚本
  ├── start-chunjun                      # 通用启动脚本
  └── submit.sh                          # 提交任务脚本
- build                                  # 构建脚本目录
  └── build.sh                           # 构建脚本
- chunjun-assembly                       # 汇总装配模块目录
- chunjun-clients                        # 客户端模块目录
- chunjun-connectors                     # 连接器模块目录
  ├── (多个子目录)                         # 不同的数据连接器子模块
- chunjun-core                           # 核心模块目录
- chunjun-ddl                            # 数据定义语言模块目录
  ├── chunjun-ddl-base                   # DDL 基础模块
  ├── chunjun-ddl-mysql                  # MySQL DDL 模块
  ├── chunjun-ddl-oracle                 # Oracle DDL 模块
- chunjun-dev                            # 开发工具模块目录
  ├── (多个子目录)                         # 包含开发用的各种工具和资源
- chunjun-dirty                          # 脏数据处理模块目录
  ├── (多个子目录)                         # 不同的脏数据处理子模块
- chunjun-docker                         # Docker 相关模块目录
  ├── (多个子目录)                         # Docker 相关资源和配置
- chunjun-e2e                            # 端到端测试模块目录
- chunjun-examples                       # 示例模块目录
  ├── json                               # JSON 示例
  └── sql                                # SQL 示例
- chunjun-local-test                     # 本地测试模块目录
- chunjun-metrics                        # 指标监控模块目录
  ├── (多个子目录)                         # 包含不同的监控模块
- chunjun-restore                        # 数据恢复模块目录
  ├── chunjun-restore-common             # 通用数据恢复模块
  └── chunjun-restore-mysql              # MySQL 数据恢复模块

编译前提:JDK和MAVEN,此处不再详述。

执行编译命令:

mvn clean package -DskipTests

编译成功:

可以在项目目录下发现所有的资源在output目录:

我们解压看看这个目录有什么内容:

├── bin
│   ├── chunjun-docker.sh
│   ├── chunjun-kubernetes-application.sh
│   ├── chunjun-kubernetes-session.sh
│   ├── chunjun-local.sh
│   ├── chunjun-standalone.sh
│   ├── chunjun-yarn-perjob.sh
│   ├── chunjun-yarn-session.sh
│   ├── start-chunjun
│   └── submit.sh
├── chunjun-dist
│   ├── chunjun-core.jar
│   ├── connector
│   ├── ddl
│   ├── dirty-data-collector
│   ├── docker-build
│   ├── metrics
│   └── restore-plugins
├── chunjun-examples
│   ├── json
│   └── sql
└── lib
    ├── chunjun-clients.jar
    ├── log4j-1.2-api-2.19.0.jar
    ├── log4j-api-2.19.0.jar
    ├── log4j-core-2.19.0.jar
    └── log4j-slf4j-impl-2.19.0.jar

ok,有了以上的内容,可以提交一个任务了。任务提交类型有:local(默认值),standaloneyarn-sessionyarn-per-jobkubernetes-sessionkubernetes-application,具体可以参考ClusterMode类:

那接下来使用local模式来提交。

2.2 提交任务

我们可以直接修改的内容来跑例子(位置“解压目录/chunjun-examples/json/mysql”):

直接修改“mysql_mysql_realtime.json”里面的内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:32306/test?useSSL=false"
                ],
                "table": [
                  "t_user"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:32306/test?useSSL=false",
                "table": [
                  "t_user_copy"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "uniqueKey": ["id"],
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

提交作业:

cd 安装目录/bin
sh chunjun-local.sh -job ../chunjun-examples/json/mysql/mysql_mysql_realtime.json

启动时,会打印:

启动结束后(是不是感觉和DataX很像):

可以看到,已经同步t_user表的数据至t_user_copy表:

03 文末

ok,本文主要简单讲解了FlinkX的简单使用,后续会继续讲解它的原理以及源码。希望能帮助到大家,谢谢大家的阅读,本文完。

目录
相关文章
|
3月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
154 0
|
5月前
|
移动开发 Java 测试技术
HarmonyOS NEXT~鸿蒙系统与mPaaS三方框架集成指南
本文详细介绍了鸿蒙系统(HarmonyOS)与mPaaS框架的集成方法。鸿蒙系统作为华为开发的分布式操作系统,具备分布式架构、微内核设计等特性;mPaaS是蚂蚁金服推出的移动开发平台,提供金融级组件和全生命周期管理能力。文章从环境准备、核心功能集成(如初始化、用户认证、支付功能)、适配问题解决到调试测试及最佳实践,全方位指导开发者高效集成两者。通过遵循指南,可充分利用鸿蒙的特性和mPaaS的金融能力,构建高性能、高安全性的应用,同时避免常见兼容性问题,缩短开发周期。
254 0
|
9月前
|
人工智能 达摩院 并行计算
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
VideoRefer 是浙江大学与阿里达摩学院联合推出的视频对象感知与推理技术,支持细粒度视频对象理解、复杂关系分析及多模态交互,适用于视频剪辑、教育、安防等多个领域。
486 17
VideoRefer:阿里达摩院开源视频对象感知与推理框架,可集成 VLLM 提升其空间和时间理解能力
|
10月前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
246 3
|
11月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
242 2
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
3288 2
Flink CDC:新一代实时数据集成框架
|
12月前
|
Java 程序员 API
Android|集成 slf4j + logback 作为日志框架
做个简单改造,统一 Android APP 和 Java 后端项目打印日志的体验。
522 1
|
监控 Devops 测试技术
DevOps实践: 持续集成和持续部署(CI/CD)的入门指南
【9月更文挑战第10天】在快速迭代的软件开发世界中,DevOps已经成为加速产品交付、提升软件质量和团队协作的关键策略。本文将深入浅出地介绍DevOps的核心组成部分——持续集成(Continuous Integration, CI)与持续部署(Continuous Deployment, CD)的基本概念、实施步骤以及它们如何革新传统的软件开发流程。你将学习到如何通过自动化工具简化开发流程,并理解为什么CI/CD是现代软件开发不可或缺的一环。
|
12月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
620 0
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
181 0