什么是 CDC ?
CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.
要解决什么问题 ?
使用 flink sql 进行数据同步,可以将数据从一个数据同步到其他的地方,比如 mysql、elasticsearch 等。
可以在源数据库上实时的物化一个聚合视图
因为只是增量同步,所以可以实时的低延迟的同步数据
使用 EventTime join 一个 temporal 表以便可以获取准确的结果
开启 Mysql Binlog
mysql 的 binlog 默认是关闭的,我们需要先把它开启,配置非常简单.
# 开启binlog日志 log-bin=mysql-bin binlog_format=ROW server_id=142
只需要配置这几个参数就可以了,还有很多可选的配置,自己也可以根据需要添加.
添加 pom 依赖
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.3.0</version> </dependency>
定义 DDL
CREATE TABLE mysql_cdc ( name STRING, age INT, city STRING, phone STRING ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'mysql', 'password' = '12345678', 'database-name' = 'test', 'table-name' = 'ab', 'debezium.snapshot.mode' = 'initial' ) CREATE TABLE kafka_mysql_cdc ( name STRING, age INT, city STRING, phone STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test1', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092', 'format' = 'debezium-json' ) insert into kafka_mysql_cdc select * from mysql_cdc
debezium-json 格式化
定义了从 mysql 读取数据并写入到 kafka 中,格式化方式是 debezium-json 然后启动任务看一下数据
{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"} {"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"} {"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}
我这里用的是 debezium-json 来格式化数据,第一次会全量读取表里的数据,可以看到只有 3 条数据, before 表示的是修改之前的数据,after 表示的是修改之后的数据,op 表示的是操作的类型.然后我先向 mysql 添加一条新的数据.
INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);
消费到的数据:
{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}
然后再来修改一条数据:
UPDATE ab set age = '00' WHERE name = 'JasonLee';
消费到的数据:
{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"} {"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}
可以看到消费到了两条数据,因为在 Flink 里面 update 操作会被翻译成 delete 和 insert 操作,第一条数据的 before 是修改之前的数据,op 的类型是 d(delete),第二条数据的 before 置为了 null, after 表示的是修改之后的数据,之前的 age 是 100,修改之后是 0 ,op 的类型是 c(create).
canal-json 格式化
只需要把上面 DDL 中的 format 改为 canal-json 即可.然后重启一下任务,消费到的数据如下:
{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"} {"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"} {"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"} {"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"} {"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"} {"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"} {"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
我们的数据是放在 data 里面,然后 type 代表了操作的类型.第一次加载的时候全部都是 INSERT 类型的数据,然后我再向 mysql 插入一条新数据
INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);
消费到的数据:
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
然后再来修改一条数据:
UPDATE ab set age = '20' WHERE name = 'clickhouse';
消费到的数据:
{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"} {"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
同样的还是消费到了两条数据,第一条是 DELETE 之前的数据,第二条是 INSERT 修改后的数据,可以看到 age 也由 0 变成了 20 .
CDC 优点
开箱即用,简单易上手
减少维护的组件,简化实时链路,减轻部署成本
减小端到端延迟
Flink 自身支持 Exactly Once 的读取和计算
数据不落地,减少存储成本
支持全量和增量流式读取
binlog 采集位点可回溯