推荐一款数据同步工具:FlinkX

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: FlinkX是基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移

FlinkX

1 什么是FlinkX

  • FlinkX是基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
template

2 工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
diagram

3 快速起步

3.1 运行模式

  • 单机模式:对应Flink集群的单机模式
  • standalone模式:对应Flink集群的分布式模式
  • yarn模式:对应Flink集群的yarn模式

3.2 执行环境

  • Java: JDK8及以上
  • Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
  • 操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。

3.3 打包

进入项目根目录,使用maven打包:

mvn clean package -Dmaven.test.skip

打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包

3.4 启动

3.4.1 命令行参数选项

  • model

    • 描述:执行模式,也就是flink集群的工作模式
      • local: 本地模式
      • standalone: 独立部署模式的flink集群
      • yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"
    • 必选:否
    • 默认值:local
  • job

    • 描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。
    • 必选:是
    • 默认值:无
  • plugin

    • 描述:插件根目录地址,也就是打包后产生的plugins目录。
    • 必选:是
    • 默认值:无
  • flinkconf

    • 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
    • 必选:否
    • 默认值:无
  • yarnconf

    • 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
    • 必选:否
    • 默认值:无

3.4.2 启动数据同步任务

  • 以本地模式启动数据同步任务
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -plugin /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
  • 以standalone模式启动数据同步任务
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -plugin /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
  • 以yarn模式启动数据同步任务
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -plugin /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

4 数据同步任务模版

从最高空俯视,一个数据同步的构成很简单,如下:

{
    "job": {
        "setting": {...},
        "content": [...]
    }
}

数据同步任务包括一个job元素,而这个元素包括setting和content两部分。

  • setting: 用于配置限速、错误控制和脏数据管理
  • content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)

4.1 setting

    "setting": {
        "speed": {...},
        "errorLimit": {...},
        "dirty": {...}
    }

setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息

4.1.1 speed

            "speed": {
                 "channel": 3,
                 "bytes": 0
            }
  • channel: 任务并发数
  • bytes: 每秒字节数,默认为 Long.MAX_VALUE

4.1.2 errorLimit

            "errorLimit": {
                "record": 10000,
                "percentage": 100
            }
  • record: 出错记录数超过record设置的条数时,任务标记为失败
  • percentage: 当出错记录数超过percentage百分数时,任务标记为失败

4.1.3 dirty

        "dirty": {
                "path": "/tmp",
                "hadoopConfig": {
                    "fs.default.name": "hdfs://ns1",
                    "dfs.nameservices": "ns1",
                    "dfs.ha.namenodes.ns1": "nn1,nn2",
                    "dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
                    "dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
                    "dfs.ha.automatic-failover.enabled": "true",
                    "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "fs.hdfs.impl.disable.cache": "true"
                }
            }
  • path: 脏数据存放路径
  • hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

4.1.4 restore

"restore": {

        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      }

4.2 content

        "content": [
            {
               "reader": {
                    "name": "...",
                    "parameter": {
                        ...
                    }
                },
               "writer": {
                    "name": "...",
                    "parameter": {
                         ...
                     }
                }
            }
        ]
  • reader: 用于读取数据的插件的信息
  • writer: 用于写入数据的插件的信息

reader和writer包括name和parameter,分别表示插件名称和插件参数

4.3 数据同步任务例子

详见flinkx-examples子工程

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
API Apache 数据库
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 于 2023 年 12 月 7 日重磅推出了其全新的 3.0 版本 ~
109699 8
 Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
1149 0
|
存储 数据采集 数据管理
一体化元数据管理平台——OpenMetadata入门宝典
一体化元数据管理平台——OpenMetadata入门宝典
3360 0
|
数据采集 分布式计算 监控
DataX教程(03)- 源码解读(超详细版)
DataX教程(03)- 源码解读(超详细版)
3853 0
DataX教程(03)- 源码解读(超详细版)
|
安全 Java 数据库连接
基于dataX实现多种数据源数据汇聚(二)
上一篇文章提到在数据中台项目实践过程中,基于dataX实现数据汇聚的一些使用心得,在众多项目中,发现一个趋势,国产数据库的发展趋势,越来越多的企业要求国产化保障核心资产的安全。本章节主要介绍国产数据的安装、连接、与归集的知识。涉及场景的国产数据库如下: 1、达梦 2、人大金仓(后续补充) 3、南大通用(后续补充)
2663 0
基于dataX实现多种数据源数据汇聚(二)
|
SQL 关系型数据库 MySQL
Flink教程(16)- Flink Table与SQL
Flink教程(16)- Flink Table与SQL
580 0
|
SQL 关系型数据库 MySQL
数据集成框架FlinkX(纯钧)入门
数据集成框架FlinkX(纯钧)入门
1000 0
|
消息中间件 存储 Apache
Apache Paimon 表模式最佳实践
Apache Paimon 表模式最佳实践
4099 57
|
数据采集 数据管理 大数据
推荐 | AllData开源数据中台技术分享
AllData数据中台架构师团队全面解析开源项目[alldata](https://github.com/alldatacenter/alldata),涵盖功能设计、架构分析及源码解读。团队分享了项目总结、发展规划,推荐关注公众号“大数据商业驱动引擎”以获取更多信息。他们讨论了数据治理、调度引擎、商业化探索及未来规划,涉及元数据管理、数据安全、Airflow调度引擎等。此外,还介绍了数据平台功能,如用户管理、权限控制,并提到了商业化版本的源码支持。鼓励用户参与社区交流,共同推动项目发展。
推荐 | AllData开源数据中台技术分享