【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDSClaw,2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 什么是 CDC ?CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.要解决什么问题 ?

什么是 CDC ?


CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.


要解决什么问题 ?


使用 flink sql 进行数据同步,可以将数据从一个数据同步到其他的地方,比如 mysql、elasticsearch 等。


可以在源数据库上实时的物化一个聚合视图

因为只是增量同步,所以可以实时的低延迟的同步数据

使用 EventTime join 一个 temporal 表以便可以获取准确的结果

开启 Mysql Binlog

mysql 的 binlog 默认是关闭的,我们需要先把它开启,配置非常简单.


# 开启binlog日志
log-bin=mysql-bin
binlog_format=ROW
server_id=142


只需要配置这几个参数就可以了,还有很多可选的配置,自己也可以根据需要添加.


添加 pom 依赖

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>


定义 DDL

CREATE TABLE mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master',
  'port' = '3306',
  'username' = 'mysql',
  'password' = '12345678',
  'database-name' = 'test',
  'table-name' = 'ab',
  'debezium.snapshot.mode' = 'initial'
)
CREATE TABLE kafka_mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test1',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
 'format' = 'debezium-json'
)
insert into kafka_mysql_cdc
select * from mysql_cdc


debezium-json 格式化


定义了从 mysql 读取数据并写入到 kafka 中,格式化方式是 debezium-json 然后启动任务看一下数据


{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"}
{"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"}
{"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}


我这里用的是 debezium-json 来格式化数据,第一次会全量读取表里的数据,可以看到只有 3 条数据, before 表示的是修改之前的数据,after 表示的是修改之后的数据,op 表示的是操作的类型.然后我先向 mysql 添加一条新的数据.


INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);


消费到的数据:


{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}


然后再来修改一条数据:


UPDATE ab set age = '00' WHERE name = 'JasonLee';


消费到的数据:


{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"}
{"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}


可以看到消费到了两条数据,因为在 Flink 里面 update 操作会被翻译成 delete 和 insert 操作,第一条数据的 before 是修改之前的数据,op 的类型是 d(delete),第二条数据的 before 置为了 null, after 表示的是修改之后的数据,之前的 age 是 100,修改之后是 0 ,op 的类型是 c(create).


canal-json 格式化


只需要把上面 DDL 中的 format 改为 canal-json 即可.然后重启一下任务,消费到的数据如下:


{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"}
{"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"}
{"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"}
{"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


我们的数据是放在 data 里面,然后 type 代表了操作的类型.第一次加载的时候全部都是 INSERT 类型的数据,然后我再向 mysql 插入一条新数据


INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


然后再来修改一条数据:


UPDATE ab set age = '20' WHERE name = 'clickhouse';


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"}
{"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


同样的还是消费到了两条数据,第一条是 DELETE 之前的数据,第二条是 INSERT 修改后的数据,可以看到 age 也由 0 变成了 20 .


CDC 优点

开箱即用,简单易上手

减少维护的组件,简化实时链路,减轻部署成本

减小端到端延迟

Flink 自身支持 Exactly Once 的读取和计算

数据不落地,减少存储成本

支持全量和增量流式读取

binlog 采集位点可回溯

相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1092 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
505 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1317 0
|
9月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1201 1
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
3307 45
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1031 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
消息中间件 Kafka 流计算
|
8月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
751 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

推荐镜像

更多
下一篇
开通oss服务