Flink CDC使用(数据采集CDC方案比较)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: cdc 使用场景场景cdc方案对比flink cdc 的基本使用

什么是CDC


CDC是Change Data Capture(变更数据捕获)的简称。其核⼼原理是监测并捕获数据库的变动(增删改等),将 这些变更按发⽣的顺序捕获,当然也可以写⼊到消息队列中供其他服务消费


cdc使用场景


image.png


image.png


cdc 实现

实现CDC即捕获数据库的变更数据有两种机制:

比较项

基于查询实现CDC

基于日志实现CDC

典型产品

Sqoop、DataX等

Canal、Debezium等

执⾏模式

批处理

流处理

捕获所有数据变化

NO

YES

低延迟

NO

YES

不增加数据库负载

NO

YES

不侵⼊业务(不需要lastUpdate字段)

NO

YES

捕获删除事件

NO

YES

捕获旧记录的状态

NO

YES


Flink CDC


Flink CDC是Flink社区开发的flink-cdc-connectors 组件,这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接 读取全量数据和增量变更数据的 source 组件。 ⽬前也已开源,开源地址:

https://github.com/ververica/flink-cdc-connectors

https://ververica.github.io/flink-cdc-connectors/master/



⽀持的连接器


image.png

支持Flink 版本


image.png


Flink CDC 优势


传统的cdc不足:


image.png


传统的基于 CDC 的 ETL 分析中,数据采集⼯具是必须的,国外⽤户常⽤ Debezium,国内⽤户常⽤阿⾥开源的 Canal,采集⼯具负责采集数据库的增量数据,⼀些采集⼯具也⽀持同步全量数据。采集到的数据⼀般输出到消息 中间件如 Kafka,然后 Flink 计算引擎再去消费这⼀部分数据写⼊到⽬的端,⽬的端可以是各种 DB,数据湖,实时 数仓和离线数仓。



注意,Flink 提供了 changelog-json format,可以将 changelog 数据写⼊离线数仓如 Hive / HDFS;对于实时数 仓,Flink ⽀持将 changelog 通过 upsert-kafka connector 直接写⼊ Kafka。




image.png


Flink CDC的基本理念就是去替换上图中红色线框内的采集组件和消息队列,从⽽简化传输链路,降低维护成本。同 时更少的组件也意味着数据时效性能够进⼀步提⾼。


Flink cdc采集方案


image.png



基于FlinkCDC,我们只需要通过⼀个 Flink SQL 作业就完成了 CDC 的数据采集,加⼯和同步,下⾯是⼀个例⼦:



--需求:同步MySQL的orders表到TiDB的orders表--1、定义MySQL中orders表的cdc源表CREATETABLEmysql_orders (
idINTNOTNULL,
product_idBIGINT,
...
PRIMARYKEY(id)
) WITH (
'connector'='mysql-cdc',
'hostname'='xx',
'port'='3306',
'username'='xx',
'password'='xx',
'database-name'='xx',
'table-name'='orders');
--2、创建TiDB结果表CREATETABLEtidb_orders(
idINTNOTNULL,
product_idBIGINT,
...
PRIMARYKEY(id)
)
WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:3306/xx',
'table-name'='orders');
--3、从源表读取数据写⼊结果表INSERTINTOtidb_ordersSELECT*FROMmysql_orders


所以基于Flink CDC的⽅案是⼀个纯 SQL 作业,⼤⼤降低了降低了使⽤⻔槛。当然,我们也可以利⽤ Flink SQL 提 供的丰富语法进⾏数据清洗、分析、聚合,⽽不仅仅是简单的数据同步。利⽤ Flink SQL 双流 JOIN、维表 JOIN、 UDTF 语法可以⾮常容易地完成实时打宽,以及各种业务逻辑加⼯。



image.png



常见CDC方案比较



开源方案

Flink CDC

Debezium

DataX


Canal

Sqoop

Kettle

Oracle Goldengate(OGG)

CDC机制

日志

日志

查询

日志(mysql)

查询

查询

日志(oracle)

增量同步

✓(侵入业务&T+1)

(侵入业务&T+1)

(侵入业务&T+1)

断点续传

全量同步

全量+增量

架构

分布式

单机

分布式

单机

分布式

分布式

分布式

数据转换/清 洗

较弱











对⽐增量同步能⼒

  • 基于⽇志的⽅式,可以很好的做到增量同步(准实时);
  • ⽽基于查询的⽅式必须侵⼊业务才能做到增量同步的,⽽且是T+1的增量同步



对⽐全量同步能⼒,基于查询或者⽇志的 CDC ⽅案基本都⽀持,除了 Canal。



⽽对⽐全量 + 增量同步的能⼒,只有 Flink CDC、Debezium、Oracle Goldengate ⽀持较好。



从架构⻆度去看,该表将架构分为单机和分布式,这⾥的分布式架构不单纯体现在数据读取能⼒的⽔平扩展 上,更重要的是在⼤数据场景下分布式系统接⼊能⼒。例如 Flink CDC 的数据⼊湖或者⼊仓的时候,下游通常 是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接⼊分布式系统能⼒上看,Flink CDC 的架 构能够很好地接⼊此类系



在数据转换 / 数据清洗能⼒上,当数据进⼊到 CDC ⼯具的时候是否能较⽅便的对数据做⼀些过滤或者清洗, 甚⾄聚合在


  • Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些
  • 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以⽤户的使⽤⻔槛会⽐较⾼


另外,在⽣态⽅⾯,这⾥指的是下游的⼀些数据库或者数据源的⽀持。Flink CDC 下游有丰富的 Connector, 例如写⼊到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常⻅的⼀些系统,也⽀持各种⾃定义 connector。



使用方式1:DataStream API


引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version></dependency>


样本代码


publicclassFlinkCDCSimple {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpoint(⽣产上是分钟级)env.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(20000L);
//指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDCbackend");
MySqlSource<String>mySqlSource=MySqlSource.<String>builder()
 .hostname("node01")
 .port(3306)
 .databaseList("cm")
 .tableList("cm.music_style")
 .username("flinkcdc")
 .password("flinkcdc%123")
 .startupOptions(StartupOptions.initial())
 .deserializer(newJsonDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString// .deserializer(new StringDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString .build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks .setParallelism(4)
 .print()
env.execute("")


Flink CDC 状态与容错


//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpointenv.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(20000L);
//指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDC-backend");



使用方式2:Flink SQL


https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html


依赖管理


将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar放到FLINK_HOME/lib下


Flink 全局配置

修改flink-conf.yaml⽂件:

execution.target: yarn-per-job#execution.checkpointing.interval: 3minexecution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONexecution.checkpointing.max-concurrent-checkpoints: 1#execution.checkpointing.min-pause: 0execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.timeout: 10min#execution.checkpointing.tolerable-failed-checkpoints: 0#execution.checkpointing.unaligned: false##Supportedbackendsare'jobmanager', 'filesystem', 'rocksdb', orthe#<class-name-of-factory>.
#state.backend: filesystem#Directoryforcheckpointsfilesystem, whenusinganyofthedefaultbundled#statebackends.
#state.checkpoints.dir:hdfs://node01:8020/flinkCDC-checkpoints


sql 代码



SET'execution.checkpointing.interval'='10s';
SET'parallelism.default'='3';
CREATETABLEmusic_style (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='mysql-cdc',
'hostname'='node01',
'port'='3306',
'username'='flinkcdc',
'password'='flinkcdc%123',
'database-name'='cm',
'table-name'='music_style',
'connect.timeout'='60s');
CREATETABLEmusic_style_copy (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://node01:3306/test?useUnicode=true&characterEncoding=utf8',
'username'='root',
'password'='root%123',
'table-name'='music_style_copy',
'sink.parallelism'='2');
insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;




canal job保存save point


flinkstop--savepointPathhdfs://node01:8020/flinkCDC-savepoints -Dyarn.application.id=application_1648361146632_0011e2eebb08b4dac374788f38b290bcf1cf



cacal job之后重新恢复job

SET'execution.checkpointing.interval'='10s';
SET'parallelism.default'='3';
SET'execution.savepoint.path'='hdfs://node01:8020/flinkCDC-savepoints/savepointe2eebb-3065002c8658';
CREATETABLEmusic_style (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='mysql-cdc',
'hostname'='node01',
'port'='3306',
'username'='flinkcdc',
'password'='flinkcdc%123',
'database-name'='cm',
'table-name'='music_style',
'connect.timeout'='60s');
CREATETABLEmusic_style_copy (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://node01:3306/test',
'username'='root',
'password'='root%123',
'table-name'='music_style_copy');
insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;



Flink CDC 2.xx设计


参考:https://blog.csdn.net/qq_30438573/article/details/119078255






相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
34 16
|
1月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
88 9
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
687 2
Flink CDC:新一代实时数据集成框架
|
3月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
585 14
Flink CDC 在货拉拉的落地与实践
|
4月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
471 13
Flink CDC 在新能源制造业的实践
|
2月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
59 0
|
2月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
54 0
|
7月前
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
7月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
114 2
|
7月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。