【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 什么是 CDC ?CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.要解决什么问题 ?

什么是 CDC ?


CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.


要解决什么问题 ?


使用 flink sql 进行数据同步,可以将数据从一个数据同步到其他的地方,比如 mysql、elasticsearch 等。


可以在源数据库上实时的物化一个聚合视图

因为只是增量同步,所以可以实时的低延迟的同步数据

使用 EventTime join 一个 temporal 表以便可以获取准确的结果

开启 Mysql Binlog

mysql 的 binlog 默认是关闭的,我们需要先把它开启,配置非常简单.


# 开启binlog日志
log-bin=mysql-bin
binlog_format=ROW
server_id=142


只需要配置这几个参数就可以了,还有很多可选的配置,自己也可以根据需要添加.


添加 pom 依赖

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>


定义 DDL

CREATE TABLE mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master',
  'port' = '3306',
  'username' = 'mysql',
  'password' = '12345678',
  'database-name' = 'test',
  'table-name' = 'ab',
  'debezium.snapshot.mode' = 'initial'
)
CREATE TABLE kafka_mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test1',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
 'format' = 'debezium-json'
)
insert into kafka_mysql_cdc
select * from mysql_cdc


debezium-json 格式化


定义了从 mysql 读取数据并写入到 kafka 中,格式化方式是 debezium-json 然后启动任务看一下数据


{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"}
{"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"}
{"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}


我这里用的是 debezium-json 来格式化数据,第一次会全量读取表里的数据,可以看到只有 3 条数据, before 表示的是修改之前的数据,after 表示的是修改之后的数据,op 表示的是操作的类型.然后我先向 mysql 添加一条新的数据.


INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);


消费到的数据:


{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}


然后再来修改一条数据:


UPDATE ab set age = '00' WHERE name = 'JasonLee';


消费到的数据:


{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"}
{"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}


可以看到消费到了两条数据,因为在 Flink 里面 update 操作会被翻译成 delete 和 insert 操作,第一条数据的 before 是修改之前的数据,op 的类型是 d(delete),第二条数据的 before 置为了 null, after 表示的是修改之后的数据,之前的 age 是 100,修改之后是 0 ,op 的类型是 c(create).


canal-json 格式化


只需要把上面 DDL 中的 format 改为 canal-json 即可.然后重启一下任务,消费到的数据如下:


{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"}
{"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"}
{"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"}
{"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


我们的数据是放在 data 里面,然后 type 代表了操作的类型.第一次加载的时候全部都是 INSERT 类型的数据,然后我再向 mysql 插入一条新数据


INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


然后再来修改一条数据:


UPDATE ab set age = '20' WHERE name = 'clickhouse';


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"}
{"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


同样的还是消费到了两条数据,第一条是 DELETE 之前的数据,第二条是 INSERT 修改后的数据,可以看到 age 也由 0 变成了 20 .


CDC 优点

开箱即用,简单易上手

减少维护的组件,简化实时链路,减轻部署成本

减小端到端延迟

Flink 自身支持 Exactly Once 的读取和计算

数据不落地,减少存储成本

支持全量和增量流式读取

binlog 采集位点可回溯

目录
打赏
0
0
0
0
13
分享
相关文章
MySQL进阶突击系列(07) 她气鼓鼓递来一条SQL | 怎么看执行计划、SQL怎么优化?
在日常研发工作当中,系统性能优化,从大的方面来看主要涉及基础平台优化、业务系统性能优化、数据库优化。面对数据库优化,除了DBA在集群性能、服务器调优需要投入精力,我们研发需要负责业务SQL执行优化。当业务数据量达到一定规模后,SQL执行效率可能就会出现瓶颈,影响系统业务响应。掌握如何判断SQL执行慢、以及如何分析SQL执行计划、优化SQL的技能,在工作中解决SQL性能问题显得非常关键。
实现MySQL与SQL Server之间数据迁移的有效方法
总的来说,从MySQL到SQL Server的数据迁移是一个涉及到很多步骤的过程,可能会遇到各种问题和挑战。但只要精心规划、仔细执行,这个任务是完全可以完成的。
118 18
【MySQL】SQL分析的几种方法
以上就是SQL分析的几种方法。需要注意的是,这些方法并不是孤立的,而是相互关联的。在实际的SQL分析中,我们通常需要结合使用这些方法,才能找出最佳的优化策略。同时,SQL分析也需要对数据库管理系统,数据,业务需求有深入的理解,这需要时间和经验的积累。
103 12
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
MySQL原理简介—1.SQL的执行流程
本文介绍了MySQL驱动、数据库连接池及SQL执行流程的关键组件和作用。主要内容包括:MySQL驱动用于建立Java系统与数据库的网络连接;数据库连接池提高多线程并发访问效率;MySQL中的连接池维护多个数据库连接并进行权限验证;网络连接由线程处理,监听请求并读取数据;SQL接口负责执行SQL语句;查询解析器将SQL语句解析为可执行逻辑;查询优化器选择最优查询路径;存储引擎接口负责实际的数据操作;执行器根据优化后的执行计划调用存储引擎接口完成SQL语句的执行。整个流程确保了高效、安全地处理SQL请求。
354 77
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
【YashanDB知识库】如何将mysql含有group by的SQL转换成崖山支持的SQL
本文探讨了在YashanDB(崖山数据库)中执行某些SQL语句时出现的报错问题,对比了MySQL的成功执行结果。问题源于SQL-92标准对非聚合列的严格限制,要求这些列必须出现在GROUP BY子句中,而SQL:1999及更高版本允许非聚合列直接出现在选择列中。YashanDB和Oracle遵循SQL-92标准,因此会报错。文章提供了两种解决方法:使用聚合函数处理非聚合列,或将GROUP BY与ORDER BY拆分为两层查询。最后总结指出,SQL-92标准更为严谨合理,建议开发者遵循此规范以避免潜在问题。
基于SQL Server / MySQL进行百万条数据过滤优化方案
对百万级别数据进行高效过滤查询,需要综合使用索引、查询优化、表分区、统计信息和视图等技术手段。通过合理的数据库设计和查询优化,可以显著提升查询性能,确保系统的高效稳定运行。
179 9
MySQL的架构与SQL语句执行过程
MySQL架构分为Server层和存储引擎层,具有高度灵活性和可扩展性。Server层包括连接器、查询缓存(MySQL 8.0已移除)、分析器、优化器和执行器,负责处理SQL语句;存储引擎层负责数据的存储和读取,常见引擎有InnoDB、MyISAM和Memory。SQL执行过程涉及连接、解析、优化、执行和结果返回等步骤,本文详细讲解了一条SQL语句的完整执行过程。
189 3
【YashanDB 知识库】如何将 mysql 含有 group by 的 SQL 转换成崖山支持的 SQL
在崖山数据库中执行某些 SQL 语句时出现报错(YAS-04316 not a single-group group function),而这些语句在 MySQL 中能成功执行。原因是崖山遵循 SQL-92 标准,不允许选择列表中包含未在 GROUP BY 子句中指定的非聚合列,而 MySQL 默认允许这种操作。解决办法包括:使用聚合函数处理非聚合列或拆分查询为两层,先进行 GROUP BY 再排序。总结来说,SQL-92 更严格,确保数据一致性,MySQL 在 5.7 及以上版本也默认遵循此标准。

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问