Flink CDC使用(数据采集CDC方案比较)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: cdc 使用场景场景cdc方案对比flink cdc 的基本使用

什么是CDC


CDC是Change Data Capture(变更数据捕获)的简称。其核⼼原理是监测并捕获数据库的变动(增删改等),将 这些变更按发⽣的顺序捕获,当然也可以写⼊到消息队列中供其他服务消费


cdc使用场景


image.png


image.png


cdc 实现

实现CDC即捕获数据库的变更数据有两种机制:

比较项

基于查询实现CDC

基于日志实现CDC

典型产品

Sqoop、DataX等

Canal、Debezium等

执⾏模式

批处理

流处理

捕获所有数据变化

NO

YES

低延迟

NO

YES

不增加数据库负载

NO

YES

不侵⼊业务(不需要lastUpdate字段)

NO

YES

捕获删除事件

NO

YES

捕获旧记录的状态

NO

YES


Flink CDC


Flink CDC是Flink社区开发的flink-cdc-connectors 组件,这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接 读取全量数据和增量变更数据的 source 组件。 ⽬前也已开源,开源地址:

https://github.com/ververica/flink-cdc-connectors

https://ververica.github.io/flink-cdc-connectors/master/



⽀持的连接器


image.png

支持Flink 版本


image.png


Flink CDC 优势


传统的cdc不足:


image.png


传统的基于 CDC 的 ETL 分析中,数据采集⼯具是必须的,国外⽤户常⽤ Debezium,国内⽤户常⽤阿⾥开源的 Canal,采集⼯具负责采集数据库的增量数据,⼀些采集⼯具也⽀持同步全量数据。采集到的数据⼀般输出到消息 中间件如 Kafka,然后 Flink 计算引擎再去消费这⼀部分数据写⼊到⽬的端,⽬的端可以是各种 DB,数据湖,实时 数仓和离线数仓。



注意,Flink 提供了 changelog-json format,可以将 changelog 数据写⼊离线数仓如 Hive / HDFS;对于实时数 仓,Flink ⽀持将 changelog 通过 upsert-kafka connector 直接写⼊ Kafka。




image.png


Flink CDC的基本理念就是去替换上图中红色线框内的采集组件和消息队列,从⽽简化传输链路,降低维护成本。同 时更少的组件也意味着数据时效性能够进⼀步提⾼。


Flink cdc采集方案


image.png



基于FlinkCDC,我们只需要通过⼀个 Flink SQL 作业就完成了 CDC 的数据采集,加⼯和同步,下⾯是⼀个例⼦:



--需求:同步MySQL的orders表到TiDB的orders表--1、定义MySQL中orders表的cdc源表CREATETABLEmysql_orders (
idINTNOTNULL,
product_idBIGINT,
...
PRIMARYKEY(id)
) WITH (
'connector'='mysql-cdc',
'hostname'='xx',
'port'='3306',
'username'='xx',
'password'='xx',
'database-name'='xx',
'table-name'='orders');
--2、创建TiDB结果表CREATETABLEtidb_orders(
idINTNOTNULL,
product_idBIGINT,
...
PRIMARYKEY(id)
)
WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:3306/xx',
'table-name'='orders');
--3、从源表读取数据写⼊结果表INSERTINTOtidb_ordersSELECT*FROMmysql_orders


所以基于Flink CDC的⽅案是⼀个纯 SQL 作业,⼤⼤降低了降低了使⽤⻔槛。当然,我们也可以利⽤ Flink SQL 提 供的丰富语法进⾏数据清洗、分析、聚合,⽽不仅仅是简单的数据同步。利⽤ Flink SQL 双流 JOIN、维表 JOIN、 UDTF 语法可以⾮常容易地完成实时打宽,以及各种业务逻辑加⼯。



image.png



常见CDC方案比较



开源方案

Flink CDC

Debezium

DataX


Canal

Sqoop

Kettle

Oracle Goldengate(OGG)

CDC机制

日志

日志

查询

日志(mysql)

查询

查询

日志(oracle)

增量同步

✓(侵入业务&T+1)

(侵入业务&T+1)

(侵入业务&T+1)

断点续传

全量同步

全量+增量

架构

分布式

单机

分布式

单机

分布式

分布式

分布式

数据转换/清 洗

较弱











对⽐增量同步能⼒

  • 基于⽇志的⽅式,可以很好的做到增量同步(准实时);
  • ⽽基于查询的⽅式必须侵⼊业务才能做到增量同步的,⽽且是T+1的增量同步



对⽐全量同步能⼒,基于查询或者⽇志的 CDC ⽅案基本都⽀持,除了 Canal。



⽽对⽐全量 + 增量同步的能⼒,只有 Flink CDC、Debezium、Oracle Goldengate ⽀持较好。



从架构⻆度去看,该表将架构分为单机和分布式,这⾥的分布式架构不单纯体现在数据读取能⼒的⽔平扩展 上,更重要的是在⼤数据场景下分布式系统接⼊能⼒。例如 Flink CDC 的数据⼊湖或者⼊仓的时候,下游通常 是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接⼊分布式系统能⼒上看,Flink CDC 的架 构能够很好地接⼊此类系



在数据转换 / 数据清洗能⼒上,当数据进⼊到 CDC ⼯具的时候是否能较⽅便的对数据做⼀些过滤或者清洗, 甚⾄聚合在


  • Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些
  • 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以⽤户的使⽤⻔槛会⽐较⾼


另外,在⽣态⽅⾯,这⾥指的是下游的⼀些数据库或者数据源的⽀持。Flink CDC 下游有丰富的 Connector, 例如写⼊到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常⻅的⼀些系统,也⽀持各种⾃定义 connector。



使用方式1:DataStream API


引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version></dependency>


样本代码


publicclassFlinkCDCSimple {
publicstaticvoidmain(String[] args) throwsException {
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpoint(⽣产上是分钟级)env.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(20000L);
//指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDCbackend");
MySqlSource<String>mySqlSource=MySqlSource.<String>builder()
 .hostname("node01")
 .port(3306)
 .databaseList("cm")
 .tableList("cm.music_style")
 .username("flinkcdc")
 .password("flinkcdc%123")
 .startupOptions(StartupOptions.initial())
 .deserializer(newJsonDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString// .deserializer(new StringDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString .build();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks .setParallelism(4)
 .print()
env.execute("")


Flink CDC 状态与容错


//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpointenv.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(20000L);
//指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDC-backend");



使用方式2:Flink SQL


https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html


依赖管理


将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar放到FLINK_HOME/lib下


Flink 全局配置

修改flink-conf.yaml⽂件:

execution.target: yarn-per-job#execution.checkpointing.interval: 3minexecution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONexecution.checkpointing.max-concurrent-checkpoints: 1#execution.checkpointing.min-pause: 0execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.timeout: 10min#execution.checkpointing.tolerable-failed-checkpoints: 0#execution.checkpointing.unaligned: false##Supportedbackendsare'jobmanager', 'filesystem', 'rocksdb', orthe#<class-name-of-factory>.
#state.backend: filesystem#Directoryforcheckpointsfilesystem, whenusinganyofthedefaultbundled#statebackends.
#state.checkpoints.dir:hdfs://node01:8020/flinkCDC-checkpoints


sql 代码



SET'execution.checkpointing.interval'='10s';
SET'parallelism.default'='3';
CREATETABLEmusic_style (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='mysql-cdc',
'hostname'='node01',
'port'='3306',
'username'='flinkcdc',
'password'='flinkcdc%123',
'database-name'='cm',
'table-name'='music_style',
'connect.timeout'='60s');
CREATETABLEmusic_style_copy (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://node01:3306/test?useUnicode=true&characterEncoding=utf8',
'username'='root',
'password'='root%123',
'table-name'='music_style_copy',
'sink.parallelism'='2');
insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;




canal job保存save point


flinkstop--savepointPathhdfs://node01:8020/flinkCDC-savepoints -Dyarn.application.id=application_1648361146632_0011e2eebb08b4dac374788f38b290bcf1cf



cacal job之后重新恢复job

SET'execution.checkpointing.interval'='10s';
SET'parallelism.default'='3';
SET'execution.savepoint.path'='hdfs://node01:8020/flinkCDC-savepoints/savepointe2eebb-3065002c8658';
CREATETABLEmusic_style (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='mysql-cdc',
'hostname'='node01',
'port'='3306',
'username'='flinkcdc',
'password'='flinkcdc%123',
'database-name'='cm',
'table-name'='music_style',
'connect.timeout'='60s');
CREATETABLEmusic_style_copy (
music_style_idBIGINT,
style_nameSTRING,
PRIMARYKEY(music_style_id) NOTENFORCED) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://node01:3306/test',
'username'='root',
'password'='root%123',
'table-name'='music_style_copy');
insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;



Flink CDC 2.xx设计


参考:https://blog.csdn.net/qq_30438573/article/details/119078255






相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
44 5
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
52 3
|
1月前
|
自然语言处理 Java Scala
Flink CDC产品常见问题之大文件整库同步怎么解决
Flink CDC产品常见问题之大文件整库同步怎么解决
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1421 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
484 5
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1357 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3