Flink CDC 最佳实践(以 MySQL 为例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Flink CDC 最佳实践(以 MySQL 为例)

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

# MySQL connection configuration
mysql.server-id: 12345
mysql.hostname: localhost
mysql.port: 3306
mysql.username: root
mysql.password: 123456
mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

# Flink CDC Job Configuration
name: mysql-cdc-job
flink.parallelism: 1
flink.checkpoint.interval: 60000
flink.checkpoint.mode: EXACTLY_ONCE
 
# MySQL CDC Source Configuration
debezium.transforms: unwrap
debezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
database.hostname: localhost
database.port: 3306
database.user: root
database.password: 123456
database.history.kafka.bootstrap.servers: localhost:9092
database.history.kafka.topic: mysql-cdc-history
database.server.id: 12345
database.server.name: test
database.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE。


debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrap。database.hostname、database.port、database.user、database.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers 为 Kafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

CREATE DATABASE test_db;
 
CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';
 
GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。


步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

CREATE TABLE test_db.test_table (
  id INT PRIMARY KEY,
  name VARCHAR(30),
  age INT,
  email VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
 
CREATE TABLE test_db.test_table_cdc (
  `database` VARCHAR(100),
  `table` VARCHAR(100),
  `type` VARCHAR(10),
  `ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  `before` JSON,
  `after` JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

public static void main(String[] args) throws Exception {
 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    env.setParallelism(1);
 
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test-group");
 
    JdbcSource<RowData> source = JdbcSource.<RowData>builder()
            .setDrivername("com.mysql.jdbc.Driver")
            .setDBUrl("jdbc:mysql://localhost:3306/test_db")
            .setUsername("flink_cdc_user")
            .setPassword("password")
            .setQuery("SELECT id, name, age, email FROM test_table")
            .setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING))
            .setFetchSize(1000)
            .build();
 
    DataStream<RowData> stream = env.addSource(source);
 

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.
[INFO] Initializing CDC source...
[INFO] CDC source successfully initialized.
[INFO] Starting CDC source...
[INFO] CDC source successfully started.
[INFO] Adding CDC source to Flink job topology...
[INFO] CDC source successfully added to Flink job topology.
[INFO] Starting Flink job...
[INFO] Flink job started successfully.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.
[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.
[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。

目录
打赏
0
0
0
0
15
分享
相关文章
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
163 1
Flink CDC + Hologres高性能数据同步优化实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Docker Compose V2 安装常用数据库MySQL+Mongo
以上内容涵盖了使用 Docker Compose 安装和管理 MySQL 和 MongoDB 的详细步骤,希望对您有所帮助。
164 42
如何排查和解决PHP连接数据库MYSQL失败写锁的问题
通过本文的介绍,您可以系统地了解如何排查和解决PHP连接MySQL数据库失败及写锁问题。通过检查配置、确保服务启动、调整防火墙设置和用户权限,以及识别和解决长时间运行的事务和死锁问题,可以有效地保障应用的稳定运行。
122 25
云数据库:从零到一,构建高可用MySQL集群
在互联网时代,数据成为企业核心资产,传统单机数据库难以满足高并发、高可用需求。云数据库通过弹性扩展、分布式架构等优势解决了这些问题,但也面临数据安全和性能优化挑战。本文介绍了如何从零开始构建高可用MySQL集群,涵盖选择云服务提供商、创建实例、配置高可用架构、数据备份恢复及性能优化等内容,并通过电商平台案例展示了具体应用。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等