基于MySQL Catalog作为CDC源表示例操作

本文涉及的产品
RDS Agent(兼容OpenClaw),2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 基于MySQL Catalog作为CDC源表示例操作

本文主要介绍如何使用Flink CDC 读取MySQL数据写入下游存储,并演示在MySQL源表侧进行相应的增删改操作对下游的更新效果。


准备工作


本文测试使用RDS MySQL数据库进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。基于MySQL Catalog提供的表被作为MySQL CDC源表,因此也需要在MySQL数据库上上开启Binlog等配置,详情请参见配置MySQL


1. 创建Catalog

使用catalog可以直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。需注意当前仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。参考文档,创建MySQL Catalog,如下所示。


使用示例:

INSERTINTO${other_sink_table}SELECT ... FROM`${mysql_catalog}`.`${db_name}`.`${table_name}`/*+ OPTIONS('server-id'='6000-6018') */;


2. 创建MySQL结果

参考云数据库RDS MySQL结果表,创建结果表DDL

CREATETABLEmysql_crud_sink (

id INT,

name varchar(10),

score INT,

PRIMARYKEY (id)

)

WITH (

'connector'='rds',

'tableName'='<yourTablename>',

'userName'='<yourUsername>',

'password'='<yourPassword>', 'url'='<yourUrl>'

);


3. 编写作业

读取MySQL Catlog数据写入到MySQL结果表,并通过Hints设置server_id

注意:由于scan.incremental.snapshot.enabled(增量快照)默认是开启的,但前提必须mysql cdc源表对应物理表设置主键,为了「扩展测试,第4小节」操作展示insert into相同id,mysql_crud_source不设置主键,所以这里需要设置为false。此处配置Hints主要用于解决catalog下读取Dynamic Table无法配置with参数的问题,SQL Hints可参考官方文档



运行演示


1. 启动作业

将mysql-crud作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2. 准备上游数据

进入dms控制台,通过Insert模拟生产数据

insert into mysql_crud_source values(1,'zhangsan',80);

insert into mysql_crud_source values(2,'lisi',85);

insert into mysql_crud_source values(3,'lisi',85);


3. 观察下游结果

进入dms控制台,查询对应表数据,发现数据已经写入


扩展学习


通过以上示例,成功演示了使用Flink CDC 读取MySQL数据写入下游存储的,接下来我们将测试针对上游表进行增删改操作,观察下游表数据更新的效果。


1.  Insert操作

MySQL Catlog注册的上游表新增一条数据:insert into mysql_crud_source values(4,'lisi',85);

然后查询下游存储表,发现数据已成功同步:


2 Delete操作

MySQL Catlog注册的上游表删除一条数据:DELETE FROM mysql_crud_source where id = 4;

然后查询下游存储表,发现结果表数据id为4的记录已被删除


3 Update操作

MySQL Catlog注册的上游表删除一条数据:update mysql_crud_source set score = 100 where id = 1;

然后查询下游存储表,对应结果表id=1的数据,score已被修改为100


4 新增相同id数据

以上演示了MySQL CDC源端数据进行增删改时,结果表同步到的内容,那么当MySQL CDC源新增相同id数据时,结果表中以id作为了主键,那么又会发生什么呢?是对原有id数据做update吗?

我们先来看下当前MySQL Catlog注册的源表的数据:

此时,mysql_crud_source新增一条id为1,score为66的数据:insert into mysql_crud_source values(1,'zhangsan',66);观察mysql_crud_sink结果如下

看起来像是source读取到数据后,sink侧写入mysql_crud_sink遇到相同id(id作为主键)做了update,为了进一步验证我们的猜想,我们通过RDS的SQL洞察和审计功能,查询对应时间段对数据库操作记录并导出csv,查看Flink在此期间对mysql执行了什么操作:

结果发现,在SQL洞察中,有这样一条记录:
INSERT INTO `mysql_crud_sink`(`id`, `name`, `score`) VALUES (1, 'zhangsan', 66) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `score`=VALUES(`score`)


基于「第1、4小节」,可以得出结论:
对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。即存在的主键字段就会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录。
当然,对于写入的物理表,如果DDL中没有声明PRIMARY KEY,即便id相同,也会用insert into方式插入记录,追加数据。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1365 0
|
SQL 运维 关系型数据库
MySQL 中 GRANT 操作会引起复制中断吗?
GRANT 操作并不是一个原子性操作,不管执行成功与否,都会触发一个隐式重载授权表的行为。 在生产环境中需要规范用户创建及授权的操作,不推荐使用 DML 语句去直接变更 mysql.user 表,可能会引发其他的问题,若使用了 DML 语句进行变更, 需要手工执行 flush privileges。
394 4
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
3364 45
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1065 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
JavaScript 关系型数据库 MySQL
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
375 0
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
614 17
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
1742 1
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
存储 关系型数据库 文件存储
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
230 2
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多