本文主要介绍如何使用Flink CDC 读取MySQL数据写入下游存储,并演示在MySQL源表侧进行相应的增删改操作对下游的更新效果。
准备工作
本文测试使用RDS MySQL数据库进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。基于MySQL Catalog提供的表被作为MySQL CDC源表,因此也需要在MySQL数据库上上开启Binlog等配置,详情请参见配置MySQL。
1. 创建Catalog
使用catalog可以直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。需注意当前仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。参考文档,创建MySQL Catalog,如下所示。
使用示例:
INSERTINTO${other_sink_table}SELECT ... FROM`${mysql_catalog}`.`${db_name}`.`${table_name}`/*+ OPTIONS('server-id'='6000-6018') */;
2. 创建MySQL结果
参考云数据库RDS MySQL结果表,创建结果表DDL
CREATETABLEmysql_crud_sink (
id INT,
name varchar(10),
score INT,
PRIMARYKEY (id)
)
WITH (
'connector'='rds',
'tableName'='<yourTablename>',
'userName'='<yourUsername>',
'password'='<yourPassword>', 'url'='<yourUrl>'
);
3. 编写作业
读取MySQL Catlog数据写入到MySQL结果表,并通过Hints设置server_id
注意:由于scan.incremental.snapshot.enabled(增量快照)默认是开启的,但前提必须mysql cdc源表对应物理表设置主键,为了「扩展测试,第4小节」操作展示insert into相同id,mysql_crud_source不设置主键,所以这里需要设置为false。此处配置Hints主要用于解决catalog下读取Dynamic Table无法配置with参数的问题,SQL Hints可参考官方文档。
运行演示
1. 启动作业
将mysql-crud作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行
2. 准备上游数据
进入dms控制台,通过Insert模拟生产数据
insert into mysql_crud_source values(1,'zhangsan',80);
insert into mysql_crud_source values(2,'lisi',85);
insert into mysql_crud_source values(3,'lisi',85);
3. 观察下游结果
进入dms控制台,查询对应表数据,发现数据已经写入
扩展学习
通过以上示例,成功演示了使用Flink CDC 读取MySQL数据写入下游存储的,接下来我们将测试针对上游表进行增删改操作,观察下游表数据更新的效果。
1. Insert操作
MySQL Catlog注册的上游表新增一条数据:insert into mysql_crud_source values(4,'lisi',85);
然后查询下游存储表,发现数据已成功同步:
2 Delete操作
MySQL Catlog注册的上游表删除一条数据:DELETE FROM mysql_crud_source where id = 4;
然后查询下游存储表,发现结果表数据id为4的记录已被删除
3 Update操作
MySQL Catlog注册的上游表删除一条数据:update mysql_crud_source set score = 100 where id = 1;
然后查询下游存储表,对应结果表id=1的数据,score已被修改为100
4 新增相同id数据
以上演示了MySQL CDC源端数据进行增删改时,结果表同步到的内容,那么当MySQL CDC源新增相同id数据时,结果表中以id作为了主键,那么又会发生什么呢?是对原有id数据做update吗?
我们先来看下当前MySQL Catlog注册的源表的数据:
此时,mysql_crud_source新增一条id为1,score为66的数据:insert into mysql_crud_source values(1,'zhangsan',66);观察mysql_crud_sink结果如下
看起来像是source读取到数据后,sink侧写入mysql_crud_sink遇到相同id(id作为主键)做了update,为了进一步验证我们的猜想,我们通过RDS的SQL洞察和审计功能,查询对应时间段对数据库操作记录并导出csv,查看Flink在此期间对mysql执行了什么操作:
结果发现,在SQL洞察中,有这样一条记录:
INSERT INTO `mysql_crud_sink`(`id`, `name`, `score`) VALUES (1, 'zhangsan', 66) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `score`=VALUES(`score`)
基于「第1、4小节」,可以得出结论:
对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。即存在的主键字段就会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录。
当然,对于写入的物理表,如果DDL中没有声明PRIMARY KEY,即便id相同,也会用insert into方式插入记录,追加数据。