【FlinkSQL实战系列】Flink SQL CDC 实时同步 Mysql 的 Binlog 数据到 kafka

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 什么是 CDC ?CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.要解决什么问题 ?

什么是 CDC ?


CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT,DELETE,UPDATE 等.


要解决什么问题 ?


使用 flink sql 进行数据同步,可以将数据从一个数据同步到其他的地方,比如 mysql、elasticsearch 等。


可以在源数据库上实时的物化一个聚合视图

因为只是增量同步,所以可以实时的低延迟的同步数据

使用 EventTime join 一个 temporal 表以便可以获取准确的结果

开启 Mysql Binlog

mysql 的 binlog 默认是关闭的,我们需要先把它开启,配置非常简单.


# 开启binlog日志
log-bin=mysql-bin
binlog_format=ROW
server_id=142


只需要配置这几个参数就可以了,还有很多可选的配置,自己也可以根据需要添加.


添加 pom 依赖

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>


定义 DDL

CREATE TABLE mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'master',
  'port' = '3306',
  'username' = 'mysql',
  'password' = '12345678',
  'database-name' = 'test',
  'table-name' = 'ab',
  'debezium.snapshot.mode' = 'initial'
)
CREATE TABLE kafka_mysql_cdc (
  name STRING,
  age INT,
  city STRING,
  phone STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test1',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
 'format' = 'debezium-json'
)
insert into kafka_mysql_cdc
select * from mysql_cdc


debezium-json 格式化


定义了从 mysql 读取数据并写入到 kafka 中,格式化方式是 debezium-json 然后启动任务看一下数据


{"before":null,"after":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"op":"c"}
{"before":null,"after":{"name":"spark","age":25,"city":"beijing","phone":"17610775718"},"op":"c"}
{"before":null,"after":{"name":"Flink","age":100,"city":"beijing","phone":"111111"},"op":"c"}


我这里用的是 debezium-json 来格式化数据,第一次会全量读取表里的数据,可以看到只有 3 条数据, before 表示的是修改之前的数据,after 表示的是修改之后的数据,op 表示的是操作的类型.然后我先向 mysql 添加一条新的数据.


INSERT INTO ab(name,age,city,phone) VALUES ('hadoop',00,'shanghai',778899);


消费到的数据:


{"before":null,"after":{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"},"op":"c"}


然后再来修改一条数据:


UPDATE ab set age = '00' WHERE name = 'JasonLee';


消费到的数据:


{"before":{"name":"JasonLee","age":100,"city":"beijing","phone":"16345646"},"after":null,"op":"d"}
{"before":null,"after":{"name":"JasonLee","age":0,"city":"beijing","phone":"16345646"},"op":"c"}


可以看到消费到了两条数据,因为在 Flink 里面 update 操作会被翻译成 delete 和 insert 操作,第一条数据的 before 是修改之前的数据,op 的类型是 d(delete),第二条数据的 before 置为了 null, after 表示的是修改之后的数据,之前的 age 是 100,修改之后是 0 ,op 的类型是 c(create).


canal-json 格式化


只需要把上面 DDL 中的 format 改为 canal-json 即可.然后重启一下任务,消费到的数据如下:


{"data":[{"name":"JasonLee","age":2,"city":"beijing","phone":"16345646"}],"type":"INSERT"}
{"data":[{"name":"spark","age":25,"city":"beijing","phone":"17610775718"}],"type":"INSERT"}
{"data":[{"name":"Flink","age":100,"city":"beijing","phone":"111111"}],"type":"INSERT"}
{"data":[{"name":"hadoop","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hive","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"hbase","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}
{"data":[{"name":"kafka","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


我们的数据是放在 data 里面,然后 type 代表了操作的类型.第一次加载的时候全部都是 INSERT 类型的数据,然后我再向 mysql 插入一条新数据


INSERT INTO ab(name,age,city,phone) VALUES ('clickhouse',00,'shanghai',778899);


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


然后再来修改一条数据:


UPDATE ab set age = '20' WHERE name = 'clickhouse';


消费到的数据:


{"data":[{"name":"clickhouse","age":0,"city":"shanghai","phone":"778899"}],"type":"DELETE"}
{"data":[{"name":"clickhouse","age":20,"city":"shanghai","phone":"778899"}],"type":"INSERT"}


同样的还是消费到了两条数据,第一条是 DELETE 之前的数据,第二条是 INSERT 修改后的数据,可以看到 age 也由 0 变成了 20 .


CDC 优点

开箱即用,简单易上手

减少维护的组件,简化实时链路,减轻部署成本

减小端到端延迟

Flink 自身支持 Exactly Once 的读取和计算

数据不落地,减少存储成本

支持全量和增量流式读取

binlog 采集位点可回溯

相关文章
|
5天前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
38 11
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何指定从特定的binlog位置或最新的binlog位置开始读取数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 算法 关系型数据库
实时计算 Flink版产品使用合集之全量历史数据比较多,全量同步阶段时间长,是否会同时读取binlog进行合并输出
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
47 2
|
5月前
|
SQL 运维 数据处理
实时计算 Flink版产品使用合集之从指定的MySQLbinlog文件读取数据并写入本地文件,但发现任务已经对指定的binlog文件做完检查点并开始处理下一个binlog文件,该怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之如果MySQL的binlog保存时间只有三天,那么三天之前的数据是不是会通过Doris的外表手动插入
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
Flink CDC产品常见问题之用upsert的方式写入kafka失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
86 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
|
2月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
69 0

热门文章

最新文章

下一篇
无影云桌面