基于MySQL Catalog作为CDC源表示例操作

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 基于MySQL Catalog作为CDC源表示例操作

本文主要介绍如何使用Flink CDC 读取MySQL数据写入下游存储,并演示在MySQL源表侧进行相应的增删改操作对下游的更新效果。


准备工作


本文测试使用RDS MySQL数据库进行演示,依据Flink集群的网络环境(VPC),构建相关上下游数据源,并做好白明单访问策略配置,保证网络链路可连通。基于MySQL Catalog提供的表被作为MySQL CDC源表,因此也需要在MySQL数据库上上开启Binlog等配置,详情请参见配置MySQL


1. 创建Catalog

使用catalog可以直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。需注意当前仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。参考文档,创建MySQL Catalog,如下所示。


使用示例:

INSERTINTO${other_sink_table}SELECT ... FROM`${mysql_catalog}`.`${db_name}`.`${table_name}`/*+ OPTIONS('server-id'='6000-6018') */;


2. 创建MySQL结果

参考云数据库RDS MySQL结果表,创建结果表DDL

CREATETABLEmysql_crud_sink (

id INT,

name varchar(10),

score INT,

PRIMARYKEY (id)

)

WITH (

'connector'='rds',

'tableName'='<yourTablename>',

'userName'='<yourUsername>',

'password'='<yourPassword>', 'url'='<yourUrl>'

);


3. 编写作业

读取MySQL Catlog数据写入到MySQL结果表,并通过Hints设置server_id

注意:由于scan.incremental.snapshot.enabled(增量快照)默认是开启的,但前提必须mysql cdc源表对应物理表设置主键,为了「扩展测试,第4小节」操作展示insert into相同id,mysql_crud_source不设置主键,所以这里需要设置为false。此处配置Hints主要用于解决catalog下读取Dynamic Table无法配置with参数的问题,SQL Hints可参考官方文档



运行演示


1. 启动作业

将mysql-crud作业上线并启动,Flink全托管控制台-作业运维,查看作业,作业正常运行


2. 准备上游数据

进入dms控制台,通过Insert模拟生产数据

insert into mysql_crud_source values(1,'zhangsan',80);

insert into mysql_crud_source values(2,'lisi',85);

insert into mysql_crud_source values(3,'lisi',85);


3. 观察下游结果

进入dms控制台,查询对应表数据,发现数据已经写入


扩展学习


通过以上示例,成功演示了使用Flink CDC 读取MySQL数据写入下游存储的,接下来我们将测试针对上游表进行增删改操作,观察下游表数据更新的效果。


1.  Insert操作

MySQL Catlog注册的上游表新增一条数据:insert into mysql_crud_source values(4,'lisi',85);

然后查询下游存储表,发现数据已成功同步:


2 Delete操作

MySQL Catlog注册的上游表删除一条数据:DELETE FROM mysql_crud_source where id = 4;

然后查询下游存储表,发现结果表数据id为4的记录已被删除


3 Update操作

MySQL Catlog注册的上游表删除一条数据:update mysql_crud_source set score = 100 where id = 1;

然后查询下游存储表,对应结果表id=1的数据,score已被修改为100


4 新增相同id数据

以上演示了MySQL CDC源端数据进行增删改时,结果表同步到的内容,那么当MySQL CDC源新增相同id数据时,结果表中以id作为了主键,那么又会发生什么呢?是对原有id数据做update吗?

我们先来看下当前MySQL Catlog注册的源表的数据:

此时,mysql_crud_source新增一条id为1,score为66的数据:insert into mysql_crud_source values(1,'zhangsan',66);观察mysql_crud_sink结果如下

看起来像是source读取到数据后,sink侧写入mysql_crud_sink遇到相同id(id作为主键)做了update,为了进一步验证我们的猜想,我们通过RDS的SQL洞察和审计功能,查询对应时间段对数据库操作记录并导出csv,查看Flink在此期间对mysql执行了什么操作:

结果发现,在SQL洞察中,有这样一条记录:
INSERT INTO `mysql_crud_sink`(`id`, `name`, `score`) VALUES (1, 'zhangsan', 66) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `name`=VALUES(`name`), `score`=VALUES(`score`)


基于「第1、4小节」,可以得出结论:
对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。即存在的主键字段就会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录。
当然,对于写入的物理表,如果DDL中没有声明PRIMARY KEY,即便id相同,也会用insert into方式插入记录,追加数据。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
5月前
|
SQL 运维 关系型数据库
MySQL 中 GRANT 操作会引起复制中断吗?
GRANT 操作并不是一个原子性操作,不管执行成功与否,都会触发一个隐式重载授权表的行为。 在生产环境中需要规范用户创建及授权的操作,不推荐使用 DML 语句去直接变更 mysql.user 表,可能会引发其他的问题,若使用了 DML 语句进行变更, 需要手工执行 flush privileges。
70 4
|
5月前
|
JavaScript 关系型数据库 MySQL
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
创建nodejs项目并接入mysql,完成用户相关的增删改查的详细操作
69 0
|
4月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
399 1
|
5月前
|
存储 关系型数据库 文件存储
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
面试题MySQL问题之简单的SELECT操作在MVCC下加锁如何解决
51 2
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Java 数据库
MySQL设计规约问题之为什么应尽量避免使用子查询,而可以考虑将其优化为join操作
MySQL设计规约问题之为什么应尽量避免使用子查询,而可以考虑将其优化为join操作
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute操作报错合集之配置mysql数据源querysql模式,同步到MC时遇到报错,该怎么处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
DataWorks 关系型数据库 MySQL
DataWorks操作报错合集之从OceanBase(OB)数据库调度数据到MySQL数据库时遇到连接报错,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。