什么是CDC
CDC是Change Data Capture(变更数据捕获)的简称。其核⼼原理是监测并捕获数据库的变动(增删改等),将 这些变更按发⽣的顺序捕获,当然也可以写⼊到消息队列中供其他服务消费
cdc使用场景
cdc 实现
实现CDC即捕获数据库的变更数据有两种机制:
比较项 |
基于查询实现CDC |
基于日志实现CDC |
典型产品 | Sqoop、DataX等 |
Canal、Debezium等 |
执⾏模式 |
批处理 |
流处理 |
捕获所有数据变化 |
NO |
YES |
低延迟 |
NO |
YES |
不增加数据库负载 |
NO |
YES |
不侵⼊业务(不需要lastUpdate字段) |
NO |
YES |
捕获删除事件 |
NO |
YES |
捕获旧记录的状态 |
NO |
YES |
Flink CDC
Flink CDC是Flink社区开发的flink-cdc-connectors 组件,这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接 读取全量数据和增量变更数据的 source 组件。 ⽬前也已开源,开源地址:
https://github.com/ververica/flink-cdc-connectors
https://ververica.github.io/flink-cdc-connectors/master/
⽀持的连接器
支持Flink 版本
Flink CDC 优势
传统的cdc不足:
传统的基于 CDC 的 ETL 分析中,数据采集⼯具是必须的,国外⽤户常⽤ Debezium,国内⽤户常⽤阿⾥开源的 Canal,采集⼯具负责采集数据库的增量数据,⼀些采集⼯具也⽀持同步全量数据。采集到的数据⼀般输出到消息 中间件如 Kafka,然后 Flink 计算引擎再去消费这⼀部分数据写⼊到⽬的端,⽬的端可以是各种 DB,数据湖,实时 数仓和离线数仓。
注意,Flink 提供了 changelog-json format,可以将 changelog 数据写⼊离线数仓如 Hive / HDFS;对于实时数 仓,Flink ⽀持将 changelog 通过 upsert-kafka connector 直接写⼊ Kafka。
Flink CDC的基本理念就是去替换上图中红色线框内的采集组件和消息队列,从⽽简化传输链路,降低维护成本。同 时更少的组件也意味着数据时效性能够进⼀步提⾼。
Flink cdc采集方案
基于FlinkCDC,我们只需要通过⼀个 Flink SQL 作业就完成了 CDC 的数据采集,加⼯和同步,下⾯是⼀个例⼦:
--需求:同步MySQL的orders表到TiDB的orders表--1、定义MySQL中orders表的cdc源表CREATETABLEmysql_orders ( idINTNOTNULL, product_idBIGINT, ... PRIMARYKEY(id) ) WITH ( 'connector'='mysql-cdc', 'hostname'='xx', 'port'='3306', 'username'='xx', 'password'='xx', 'database-name'='xx', 'table-name'='orders'); --2、创建TiDB结果表CREATETABLEtidb_orders( idINTNOTNULL, product_idBIGINT, ... PRIMARYKEY(id) ) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://localhost:3306/xx', 'table-name'='orders'); --3、从源表读取数据写⼊结果表INSERTINTOtidb_ordersSELECT*FROMmysql_orders
所以基于Flink CDC的⽅案是⼀个纯 SQL 作业,⼤⼤降低了降低了使⽤⻔槛。当然,我们也可以利⽤ Flink SQL 提 供的丰富语法进⾏数据清洗、分析、聚合,⽽不仅仅是简单的数据同步。利⽤ Flink SQL 双流 JOIN、维表 JOIN、 UDTF 语法可以⾮常容易地完成实时打宽,以及各种业务逻辑加⼯。
常见CDC方案比较
开源方案 |
Flink CDC |
Debezium |
DataX |
Canal |
Sqoop |
Kettle |
Oracle Goldengate(OGG) |
CDC机制 |
日志 |
日志 |
查询 |
日志(mysql) |
查询 |
查询 |
日志(oracle) |
增量同步 |
✓ |
✓ |
✓(侵入业务&T+1) |
✓ |
✓(侵入业务&T+1) |
✓(侵入业务&T+1) |
✓ |
断点续传 |
✓ |
✓ |
✕ |
✓ |
✕ |
✕ |
✓ |
全量同步 |
✓ |
✓ |
✓ |
✕ |
✓ |
✓ |
✓ |
全量+增量 |
✓ |
✓ |
✕ |
✕ |
✕ |
✕ |
✓ |
架构 |
分布式 |
单机 |
分布式 |
单机 |
分布式 |
分布式 |
分布式 |
数据转换/清 洗 |
强 |
弱 |
弱 |
弱 |
弱 |
弱 |
较弱 |
对⽐增量同步能⼒
- 基于⽇志的⽅式,可以很好的做到增量同步(准实时);
- ⽽基于查询的⽅式必须侵⼊业务才能做到增量同步的,⽽且是T+1的增量同步
对⽐全量同步能⼒,基于查询或者⽇志的 CDC ⽅案基本都⽀持,除了 Canal。
⽽对⽐全量 + 增量同步的能⼒,只有 Flink CDC、Debezium、Oracle Goldengate ⽀持较好。
从架构⻆度去看,该表将架构分为单机和分布式,这⾥的分布式架构不单纯体现在数据读取能⼒的⽔平扩展 上,更重要的是在⼤数据场景下分布式系统接⼊能⼒。例如 Flink CDC 的数据⼊湖或者⼊仓的时候,下游通常 是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接⼊分布式系统能⼒上看,Flink CDC 的架 构能够很好地接⼊此类系
在数据转换 / 数据清洗能⼒上,当数据进⼊到 CDC ⼯具的时候是否能较⽅便的对数据做⼀些过滤或者清洗, 甚⾄聚合在
- Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些
- 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以⽤户的使⽤⻔槛会⽐较⾼
另外,在⽣态⽅⾯,这⾥指的是下游的⼀些数据库或者数据源的⽀持。Flink CDC 下游有丰富的 Connector, 例如写⼊到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常⻅的⼀些系统,也⽀持各种⾃定义 connector。
使用方式1:DataStream API
引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-json --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version></dependency>
样本代码
publicclassFlinkCDCSimple { publicstaticvoidmain(String[] args) throwsException { StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpoint(⽣产上是分钟级)env.enableCheckpointing(10000L); env.getCheckpointConfig().setCheckpointTimeout(20000L); //指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDCbackend"); MySqlSource<String>mySqlSource=MySqlSource.<String>builder() .hostname("node01") .port(3306) .databaseList("cm") .tableList("cm.music_style") .username("flinkcdc") .password("flinkcdc%123") .startupOptions(StartupOptions.initial()) .deserializer(newJsonDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString// .deserializer(new StringDebeziumDeserializationSchema()) // convertsSourceRecordtoJSONString .build(); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print() env.execute("")
Flink CDC 状态与容错
//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//开启Checkpointenv.enableCheckpointing(10000L); env.getCheckpointConfig().setCheckpointTimeout(20000L); //指定Checkpoint的⼀致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置任务关闭的时候保留Checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //指定⾃动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //设置状态后端env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDC-backend");
使用方式2:Flink SQL
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
依赖管理
将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar放到FLINK_HOME/lib下
Flink 全局配置
修改flink-conf.yaml⽂件:
execution.target: yarn-per-job#execution.checkpointing.interval: 3minexecution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONexecution.checkpointing.max-concurrent-checkpoints: 1#execution.checkpointing.min-pause: 0execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.timeout: 10min#execution.checkpointing.tolerable-failed-checkpoints: 0#execution.checkpointing.unaligned: false##Supportedbackendsare'jobmanager', 'filesystem', 'rocksdb', orthe#<class-name-of-factory>. #state.backend: filesystem#Directoryforcheckpointsfilesystem, whenusinganyofthedefaultbundled#statebackends. #state.checkpoints.dir:hdfs://node01:8020/flinkCDC-checkpoints
sql 代码
SET'execution.checkpointing.interval'='10s'; SET'parallelism.default'='3'; CREATETABLEmusic_style ( music_style_idBIGINT, style_nameSTRING, PRIMARYKEY(music_style_id) NOTENFORCED) WITH ( 'connector'='mysql-cdc', 'hostname'='node01', 'port'='3306', 'username'='flinkcdc', 'password'='flinkcdc%123', 'database-name'='cm', 'table-name'='music_style', 'connect.timeout'='60s'); CREATETABLEmusic_style_copy ( music_style_idBIGINT, style_nameSTRING, PRIMARYKEY(music_style_id) NOTENFORCED) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://node01:3306/test?useUnicode=true&characterEncoding=utf8', 'username'='root', 'password'='root%123', 'table-name'='music_style_copy', 'sink.parallelism'='2'); insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;
canal job保存save point
flinkstop--savepointPathhdfs://node01:8020/flinkCDC-savepoints -Dyarn.application.id=application_1648361146632_0011e2eebb08b4dac374788f38b290bcf1cf
cacal job之后重新恢复job
SET'execution.checkpointing.interval'='10s'; SET'parallelism.default'='3'; SET'execution.savepoint.path'='hdfs://node01:8020/flinkCDC-savepoints/savepointe2eebb-3065002c8658'; CREATETABLEmusic_style ( music_style_idBIGINT, style_nameSTRING, PRIMARYKEY(music_style_id) NOTENFORCED) WITH ( 'connector'='mysql-cdc', 'hostname'='node01', 'port'='3306', 'username'='flinkcdc', 'password'='flinkcdc%123', 'database-name'='cm', 'table-name'='music_style', 'connect.timeout'='60s'); CREATETABLEmusic_style_copy ( music_style_idBIGINT, style_nameSTRING, PRIMARYKEY(music_style_id) NOTENFORCED) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://node01:3306/test', 'username'='root', 'password'='root%123', 'table-name'='music_style_copy'); insertintomusic_style_copySELECT*FROMmusic_style/*+ OPTIONS('server-id'='5401-5404') */ ;
Flink CDC 2.xx设计
参考:https://blog.csdn.net/qq_30438573/article/details/119078255