Flink CDC 最佳实践(以 MySQL 为例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink CDC 最佳实践(以 MySQL 为例)

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

# MySQL connection configuration
mysql.server-id: 12345
mysql.hostname: localhost
mysql.port: 3306
mysql.username: root
mysql.password: 123456
mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

# Flink CDC Job Configuration
name: mysql-cdc-job
flink.parallelism: 1
flink.checkpoint.interval: 60000
flink.checkpoint.mode: EXACTLY_ONCE
 
# MySQL CDC Source Configuration
debezium.transforms: unwrap
debezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
database.hostname: localhost
database.port: 3306
database.user: root
database.password: 123456
database.history.kafka.bootstrap.servers: localhost:9092
database.history.kafka.topic: mysql-cdc-history
database.server.id: 12345
database.server.name: test
database.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE。


debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrap。database.hostname、database.port、database.user、database.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers 为 Kafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

CREATE DATABASE test_db;
 
CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';
 
GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。


步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

CREATE TABLE test_db.test_table (
  id INT PRIMARY KEY,
  name VARCHAR(30),
  age INT,
  email VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
 
CREATE TABLE test_db.test_table_cdc (
  `database` VARCHAR(100),
  `table` VARCHAR(100),
  `type` VARCHAR(10),
  `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  `before` JSON,
  `after` JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

public static void main(String[] args) throws Exception {
 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    env.setParallelism(1);
 
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test-group");
 
    JdbcSource<RowData> source = JdbcSource.<RowData>builder()
            .setDrivername("com.mysql.jdbc.Driver")
            .setDBUrl("jdbc:mysql://localhost:3306/test_db")
            .setUsername("flink_cdc_user")
            .setPassword("password")
            .setQuery("SELECT id, name, age, email FROM test_table")
            .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING))
            .setFetchSize(1000)
            .build();
 
    DataStream<RowData> stream = env.addSource(source);
 

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.
[INFO] Initializing CDC source...
[INFO] CDC source successfully initialized.
[INFO] Starting CDC source...
[INFO] CDC source successfully started.
[INFO] Adding CDC source to Flink job topology...
[INFO] CDC source successfully added to Flink job topology.
[INFO] Starting Flink job...
[INFO] Flink job started successfully.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.
[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.
[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。

相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
209 0
|
7天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
39 16
|
1月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
95 9
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
697 2
Flink CDC:新一代实时数据集成框架
|
6天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
24 3
|
6天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
29 3
|
6天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
35 2
|
20天前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
140 15
|
13天前
|
SQL 关系型数据库 MySQL
数据库数据恢复—Mysql数据库表记录丢失的数据恢复方案
Mysql数据库故障: Mysql数据库表记录丢失。 Mysql数据库故障表现: 1、Mysql数据库表中无任何数据或只有部分数据。 2、客户端无法查询到完整的信息。
|
20天前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。