基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。

这篇教程将展示如何基于 Flink CDC YAML 快速构建 MySQL 到 Kafka 的流式数据集成作业,包含整库同步、表结构变更同步的演示和特色功能的介绍。

本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。

准备阶段

准备 Flink Standalone 集群

  1. 下载 Flink 1.19.2,解压后得到 flink-1.19.2 目录。使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.19.2 所在目录。
tar -zxvf  flink-1.19.2-bin-scala_2.12.tgz
export FLINK_HOME=$(pwd)/flink-1.19.2
cd flink-1.19.2
AI 代码解读
  1. 通过在 conf/config.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
execution:
    checkpointing:
        interval: 3000
AI 代码解读
  1. 使用下面的命令启动 Flink 集群。
./bin/start-cluster.sh
AI 代码解读

启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,如下图所示 :

重复执行 start-cluster.sh 可以拉起多个 TaskManager。

注:如果你是云服务器,无法访问本地,需要将 conf/config.yaml 里面 rest.bind-address 和 rest.address的 localhost 改成0.0.0.0,然后使用 公网IP:8081 即可访问。

准备 Docker 环境

使用下面的内容创建一个 docker-compose.yml 文件:

services:
   Zookeeper:
  image: zookeeper:3.7.1
  ports:
    - "2181:2181"
  environment:
    - ALLOW_ANONYMOUS_LOGIN=yes
   Kafka:
  image: bitnami/kafka:2.8.1
  ports:
    - "9092:9092"
    - "9093:9093"
  environment:
    - ALLOW_PLAINTEXT_LISTENER=yes
    - KAFKA_LISTENERS=PLAINTEXT://:9092
    - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.67.2:9092
    - KAFKA_ZOOKEEPER_CONNECT=192.168.67.2:2181
   MySQL:
  image: debezium/example-mysql:1.1
  ports:
    - "3306:3306"
  environment:
    - MYSQL_ROOT_PASSWORD=123456
    - MYSQL_USER=mysqluser
    - MYSQL_PASSWORD=mysqlpw
AI 代码解读

注意:文件里面的 192.168.67.2 为内网 IP,可通过 ifconfig 查找。

该 Docker Compose 中包含的组件有:

  • MySQL: 包含商品信息的数据库app_db

  • Kafka: 存储从 MySQL 中根据规则映射过来的结果表

  • Zookeeper:主要用于进行Kafka集群管理和协调

在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d
AI 代码解读

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有组件。你可以通过 docker ps 来观察上述的容器是否正常启动了。

在 MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec MySQL mysql -uroot -p123456
AI 代码解读

创建数据库 app_db和表 orders,products,shipments 并插入数据

-- 创建数据库
 CREATE DATABASE app_db;

 USE app_db;

 -- 创建 orders 表
 CREATE TABLE `orders` (
 `id` INT NOT NULL,
 `price` DECIMAL(10,2) NOT NULL,
 PRIMARY KEY (`id`)
 );

 -- 插入数据
 INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
 INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

 -- 创建 shipments 表
 CREATE TABLE `shipments` (
 `id` INT NOT NULL,
 `city` VARCHAR(255) NOT NULL,
 PRIMARY KEY (`id`)
 );

 -- 插入数据
 INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
 INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

 -- 创建 products 表
 CREATE TABLE `products` (
 `id` INT NOT NULL,
 `product` VARCHAR(255) NOT NULL,
 PRIMARY KEY (`id`)
 );

 -- 插入数据
 INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
 INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
 INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
AI 代码解读

通过 Flink CDC CLI 提交任务

  1. 下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.3.0;

flink-cdc-3.3.0-bin.tar.gz下会包含 bin、lib、log、conf 四个目录。

  1. 下载下面列出的 connector 包,并且移动到 lib 目录下:

您还需要将下面的 Driver 包放在 Flink lib 目录下,或通过 --jar 参数将其传入 Flink CDC CLI,因为 CDC Connectors 不再包含这些 Drivers:

  1. 编写任务配置 yaml 文件

下面给出了一个整库同步的示例文件 mysql-to-kafka.yaml:

################################################################################
# Description: Sync MySQL all tables to Kafka
################################################################################
source:
  type: mysql
  hostname: 0.0.0.0
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: 0.0.0.0:9092
  topic: yaml-mysql-kafka
pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 1
AI 代码解读

其中:source 中的 tables: app_db..* 通过正则匹配同步 app_db 下的所有表。

  1. 最后,通过命令行提交任务到 Flink Standalone cluster
bash bin/flink-cdc.sh mysql-to-kafka.yaml
# 参考,一些自定义路径的示例  主要用于多版本flink,mysql驱动不一致等情况 如,
# bash /root/flink-cdc-3.3.0/bin/flink-cdc.sh /root/flink-cdc-3.3.0/bin/mysql-to-kafka.yaml --flink-home /root/flink-1.19. --jar /root/flink-cdc-3.3.0/lib/mysql-connector-java-8.0.27.jar
AI 代码解读

提交成功后,返回信息如:

Pipeline has been submitted to cluster.
Job ID: ba2afd0697524bd4857183936507b0bf
Job Description: MySQL to Kafka Pipeline
AI 代码解读

在 Flink Web UI,可以看到一个名为 MySQL to Kafka Pipeline 的任务正在运行。

可以通过kafka自带的客户端查看Topic情况,得到debezium-json格式的内容:

docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server 192.168.31.229:9092 --topic yaml-mysql-kafka --from-beginning
AI 代码解读

debezium-json 格式包含了 before,after,op,source 几个元素,展示示例如下:

{
    "before": null,
    "after": {
        "id": 1,
        "price": 4
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "orders"
    }
}
...
{
    "before": null,
    "after": {
        "id": 1,
        "product": "Beer"
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "products"
    }
}
...
{
    "before": null,
    "after": {
        "id": 2,
        "city": "xian"
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "shipments"
    }
}
AI 代码解读

同步变更

进入 MySQL 容器:

docker-compose exec MySQL mysql -uroot -p123456
AI 代码解读

接下来,修改 MySQL 数据库中表的数据,StarRocks 中显示的订单数据也将实时更新:

  1. 在 MySQL 的 orders 表中插入一条数据
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
AI 代码解读
  1. 在 MySQL 的 orders 表中增加一个字段
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
AI 代码解读
  1. 在 MySQL 的 orders 表中更新一条数据
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
AI 代码解读
  1. 在 MySQL 的 orders 表中删除一条数据
DELETE FROM app_db.orders WHERE id=2;
AI 代码解读

通过消费者监控 topic,我们可以看到 Kafka 上也在实时发生着这些变更:

{
    "before": {
        "id": 1,
        "price": 4,
        "amount": null
    },
    "after": {
        "id": 1,
        "price": 100,
        "amount": "100.00"
    },
    "op": "u",
    "source": {
        "db": "app_db",
        "table": "orders"
    }
}
AI 代码解读

同样地,去修改 shipments, products 表,也能在 Kafka对应的 topic 中实时看到同步变更的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。下面提供一个配置文件说明:

################################################################################
# Description: Sync MySQL all tables to Kafka
################################################################################
source:
  type: mysql
  hostname: 0.0.0.0
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: 0.0.0.0:9092
pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 1
route:
 - source-table: app_db.orders
   sink-table: kafka_ods_orders
 - source-table: app_db.shipments
   sink-table: kafka_ods_shipments
 - source-table: app_db.products
   sink-table: kafka_ods_products
AI 代码解读

通过上面的 route 配置,会将 app_db.orders 表的结构和数据同步到 kafka_ods_orders 中。从而实现数据库迁移的功能。特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:

route:
  - source-table: app_db.order\.*
    sink-table: kafka_ods_orders
AI 代码解读

这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 kafka_ods_orders 中。利用kafka自带的工具,可查看对应Topic成功建立,数据详情可使用kafka-console-consumer.sh进行查询:

docker-compose exec Kafka kafka-topics.sh --bootstrap-server 192.168.67.2:9092 --list
AI 代码解读

新创建的 Kafka Topic 信息如下:

__consumer_offsets
kafka_ods_orders
kafka_ods_products
kafka_ods_shipments
yaml-mysql-kafka
AI 代码解读

选取 kafka_ods_orders 这个 Topic 进行查询,返回数据示例如下:

{
    "before": null,
    "after": {
        "id": 1,
        "price": 100,
        "amount": "100.00"
    },
    "op": "c",
    "source": {
        "db": null,
        "table": "kafka_ods_orders"
    }
}
AI 代码解读

写入多个分区

使用 partition.strategy 参数可以定义发送数据到 Kafka 分区的策略, 可以设置的选项有:

  • `all-to-zero`(将所有数据发送到 0 号分区),默认值

  • `hash-by-key`(所有数据根据主键的哈希值分发)

我们基于mysql-to-kafka.yaml在 sink下定义一行partition.strategy: hash-by-key

source:
  ...
sink:
  ...
  topic: yaml-mysql-kafka-hash-by-key
  partition.strategy: hash-by-key
pipeline:
  ...
AI 代码解读

同时我们利用 Kafka 的脚本新建一个12分区的 kafka Topic:

docker-compose exec Kafka kafka-topics.sh --create --topic yaml-mysql-kafka-hash-by-key --bootstrap-server 192.168.67.2:9092  --partitions 12
AI 代码解读

提交yaml程序后,这个时候我们指定一下分区消费,查看一下各个分区里面所存储的数据。

docker-compose exec Kafka kafka-console-consumer.sh --bootstrap-server=192.168.67.2:9092  --topic yaml-mysql-kafka-hash-by-key  --partition 0  --from-beginning
AI 代码解读

部分分区数据详情如下:

# 分区0
{
   
    "before": null,
    "after": {
   
        "id": 1,
        "price": 100,
        "amount": "100.00"
    },
    "op": "c",
    "source": {
   
        "db": "app_db",
        "table": "orders"
    }
}
# 分区4
{
   
    "before": null,
    "after": {
   
        "id": 2,
        "product": "Cap"
    },
    "op": "c",
    "source": {
   
        "db": "app_db",
        "table": "products"
    }
}
{
   
    "before": null,
    "after": {
   
        "id": 1,
        "city": "beijing"
    },
    "op": "c",
    "source": {
   
        "db": "app_db",
        "table": "shipments"
    }
}
AI 代码解读

输出格式

value.format 参数用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 debezium-jsoncanal-json, 默认值为 `debezium-json`,目前还不支持用户自定义输出格式。

  • debezium-json格式会包含 before(变更前的数据)/after(变更后的数据)/op(变更类型)/source(元数据) 几个元素,ts_ms 字段并不会默认包含在输出结构中(需要在 Source 中指定 metadata.list 配合)。

  • canal-json格式会包含 old/data/type/database/table/pkNames 几个元素,但是 ts 并不会默认包含在其中(原因同上)。

可以在 YAML 文件的 sink 中定义 value.format: canal-json 来指定输出格式为 canal-json 类型:

source:
  ...

sink:
  ...
  topic: yaml-mysql-kafka-canal
  value.format: canal-json
pipeline:
  ...
AI 代码解读

查询对应 Topic 的数据,返回示例如下:

{
   
    "old": null,
    "data": [
        {
   
            "id": 1,
            "price": 100,
            "amount": "100.00"
        }
    ],
    "type": "INSERT",
    "database": "app_db",
    "table": "orders",
    "pkNames": [
        "id"
    ]
}
AI 代码解读

上游表名到下游Topic名的映射关系

使用 sink.tableId-to-topic.mapping 参数可以指定上游表名到下游 Kafka Topic 名的映射关系。无需使用 route 配置。与之前介绍的通过 route 实现的不同点在于,配置该参数可以在保留源表的表名信息的情况下设置写入的 Topic 名称。

在前面的 YAML 文件中增加 sink.tableId-to-topic.mapping 配置指定映射关系,每个映射关系由 ; 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 : 分割:

source:
  ...

sink:
  ...
  sink.tableId-to-topic.mapping: app_db.orders:yaml-mysql-kafka-orders;app_db.shipments:yaml-mysql-kafka-shipments;app_db.products:yaml-mysql-kafka-products
pipeline:
  ...
AI 代码解读

运行后,Kafka 中将会生成如下的 Topic:

...
yaml-mysql-kafka-orders
yaml-mysql-kafka-products
yaml-mysql-kafka-shipments
AI 代码解读

Kafka 不同 Topic 中部分数据详情:

{
    "before": null,
    "after": {
        "id": 1,
        "price": 100,
        "amount": "100.00"
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "orders"
    }
}
AI 代码解读
{
    "before": null,
    "after": {
        "id": 2,
        "product": "Cap"
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "products"
    }
}
AI 代码解读
{
    "before": null,
    "after": {
        "id": 2,
        "city": "xian"
    },
    "op": "c",
    "source": {
        "db": "app_db",
        "table": "shipments"
    }
}
AI 代码解读

环境清理

本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down
AI 代码解读

在 Flink 所在目录 flink-1.19.2下执行如下命令停止 Flink 集群:

./bin/stop-cluster.sh
AI 代码解读

更多内容


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
2
2
0
1614
分享
相关文章
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
150 1
Amoro + Flink CDC 数据融合入湖新体验
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
430 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
519 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
242 1
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
MySQL功能模块探秘:数据库世界的奇妙之旅
]带你轻松愉快地探索MySQL 8.4.5的核心功能模块,从SQL引擎到存储引擎,从复制机制到插件系统,让你在欢声笑语中掌握数据库的精髓!
50 26
Go语言数据库编程:使用 `database/sql` 与 MySQL/PostgreSQL
Go语言通过`database/sql`标准库提供统一数据库操作接口,支持MySQL、PostgreSQL等多种数据库。本文介绍了驱动安装、连接数据库、基本增删改查操作、预处理语句、事务处理及错误管理等内容,涵盖实际开发中常用的技巧与注意事项,适合快速掌握Go语言数据库编程基础。
111 62
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
【YashanDB知识库】原生mysql驱动配置连接崖山数据库
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
本文详细介绍了在 MySQL 中创建数据库和表的方法。包括安装 MySQL、用命令行和图形化工具创建数据库、选择数据库、创建表(含数据类型介绍与选择建议、案例分析、最佳实践与注意事项)以及查看数据库和表的内容。文章专业、严谨且具可操作性,对数据管理有实际帮助。
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂

相关产品

  • 实时计算 Flink版
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等

    登录插画

    登录以查看您的控制台资源

    管理云资源
    状态一览
    快捷访问