基于MySQL Catalog作为CDC源表示例操作

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 基于MySQL Catalog作为CDC源表示例操作

本文主要介绍如何使用Flink CDC 读取MySQL数据写入下游存储,并演示在MySQL源表侧进行相应的增删改操作对下游的更新效果。


准备工作


本文测试使用RDS MySQL数据库进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。基于MySQL Catalog提供的表被作为MySQL CDC源表,因此也需要在MySQL数据库上上开启Binlog等配置,详情请参见配置MySQL


1. 创建Catalog

使用catalog可以直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。需注意当前仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。参考文档,创建MySQL Catalog,如下所示。


使用示例:

INSERTINTO${other_sink_table}SELECT ... FROM`${mysql_catalog}`.`${db_name}`.`${table_name}`/*+ OPTIONS('server-id'='6000-6018') */;


2. 创建MySQL结果

参考云数据库RDS MySQL结果表,创建结果表DDL

CREATETABLEmysql_crud_sink (

id INT,

name varchar(10),

score INT,

PRIMARYKEY (id)

)

WITH (

'connector'='rds',

'tableName'='<yourTablename>',

'userName'='<yourUsername>',

'password'='<yourPassword>', 'url'='<yourUrl>'

);


3. 编写作业

读取MySQL Catlog数据写入到MySQL结果表,并通过Hints设置server_id

注意:由于scan.incremental.snapshot.enabled(增量快照)默认是开启的,但前提必须mysql cdc源表对应物理表设置主键,为了「扩展测试,第4小节」操作展示insert into相同id,mysql_crud_source不设置主键,所以这里需要设置为false。此处配置Hints主要用于解决catalog下读取Dynamic Table无法配置with参数的问题,SQL Hints可参考官方文档



运行演示


1. 启动作业

将mysql-crud作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2. 准备上游数据

进入dms控制台,通过Insert模拟生产数据

insert into mysql_crud_source values(1,'zhangsan',80);

insert into mysql_crud_source values(2,'lisi',85);

insert into mysql_crud_source values(3,'lisi',85);


3. 观察下游结果

进入dms控制台,查询对应表数据,发现数据已经写入


扩展学习


通过以上示例,成功演示了使用Flink CDC 读取MySQL数据写入下游存储的,接下来我们将测试针对上游表进行增删改操作,观察下游表数据更新的效果。


1.  Insert操作

MySQL Catlog注册的上游表新增一条数据:insert into mysql_crud_source values(4,'lisi',85);

然后查询下游存储表,发现数据已成功同步:


2 Delete操作

MySQL Catlog注册的上游表删除一条数据:DELETE FROM mysql_crud_source where id = 4;

然后查询下游存储表,发现结果表数据id为4的记录已被删除


3 Update操作

MySQL Catlog注册的上游表删除一条数据:update mysql_crud_source set score = 100 where id = 1;

然后查询下游存储表,对应结果表id=1的数据,score已被修改为100


4 新增相同id数据

以上演示了MySQL CDC源端数据进行增删改时,结果表同步到的内容,那么当MySQL CDC源新增相同id数据时,结果表中以id作为了主键,那么又会发生什么呢?是对原有id数据做update吗?

我们先来看下当前MySQL Catlog注册的源表的数据:

此时,mysql_crud_source新增一条id为1,score为66的数据:insert into mysql_crud_source values(1,'zhangsan',66);观察mysql_crud_sink结果如下

看起来像是source读取到数据后,sink侧写入mysql_crud_sink遇到相同id(id作为主键)做了update,为了进一步验证我们的猜想,我们通过RDS的SQL洞察和审计功能,查询对应时间段对数据库操作记录并导出csv,查看Flink在此期间对mysql执行了什么操作:

结果发现,在SQL洞察中,有这样一条记录:
INSERT INTO `mysql_crud_sink`(`id`, `name`, `score`) VALUES (1, 'zhangsan', 66) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `score`=VALUES(`score`)


基于「第1、4小节」,可以得出结论:
对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。即存在的主键字段就会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录。
当然,对于写入的物理表,如果DDL中没有声明PRIMARY KEY,即便id相同,也会用insert into方式插入记录,追加数据。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
23天前
|
SQL 关系型数据库 MySQL
python操作mysql
python操作mysql
|
29天前
|
安全 关系型数据库 MySQL
轻松入门MySQL:MySQL8权限管理详解,角色和用户操作实例(18)
轻松入门MySQL:MySQL8权限管理详解,角色和用户操作实例(18)
|
29天前
|
缓存 关系型数据库 MySQL
MySQL查询优化:提速查询效率的13大秘籍(合理使用索引合并、优化配置参数、使用分区优化性能、避免不必要的排序和group by操作)(下)
MySQL查询优化:提速查询效率的13大秘籍(合理使用索引合并、优化配置参数、使用分区优化性能、避免不必要的排序和group by操作)(下)
|
1天前
|
SQL 存储 关系型数据库
【MySQL】DDL的表操作详解:创建&查询&修改&删除
【MySQL】DDL的表操作详解:创建&查询&修改&删除
|
2天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之DataWorks在同步mysql时报错Code:[Framework-02],mysql里面有个json类型字段,是什么原因导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
15 0
|
2天前
|
关系型数据库 MySQL Java
使用shardingjdbc执行MySQL游标操作时报错
使用shardingjdbc执行MySQL游标操作时报错
|
2天前
|
存储 SQL 关系型数据库
mysql查询数据库表大小怎么操作
mysql查询数据库表大小怎么操作
|
2天前
|
SQL 关系型数据库 MySQL
使用Python的pymysql库连接MySQL,执行CRUD操作
使用Python的pymysql库连接MySQL,执行CRUD操作:安装pymysql,然后连接(host=&#39;localhost&#39;,user=&#39;root&#39;,password=&#39;yourpassword&#39;,database=&#39;yourdatabase&#39;),创建游标。查询数据示例:`SELECT * FROM yourtable`;插入数据:`INSERT INTO yourtable...`;更新数据:`UPDATE yourtable SET...`;删除数据:`DELETE FROM yourtable WHERE...`。
9 0
|
3天前
|
SQL 关系型数据库 数据库
【MySQL】:DDL数据库定义与操作
【MySQL】:DDL数据库定义与操作
8 0
|
14天前
|
Oracle 关系型数据库 MySQL
数据库中对时间的操作(mySql、Oracle、pgSql)
数据库中对时间的操作(mySql、Oracle、pgSql)