摘要:本文撰写自阿里云开源大数据平台数据通道团队,主要介绍了 Flink CDC YAML 在实时计算Flink版的最佳实践。内容分为以下五个部分:
- CDC YAML 简介
- CDC YAML 核心能力
- CDC YAML 应用场景
- 阿里云 Flink CDC 企业级功能
- 十分钟在阿里云免费实现一个 CDC YAML 作业
CDC YAML 简介
CDC YAML 是 Flink CDC [1] 提供的简单易用的数据集成 API,用于帮助用户快速构建功能强大的数据同步链路,实时地同步业务数据库中的数据变更和表结构变更实时同步到数据仓库,数据湖以及其他下游系统。CDC YAML 上手门槛较低,即使没有研发背景和Flink基础,用户也可以较快地完成数据的同步和ETL加工,快速完成数据的实时入湖入仓,加速数据分析效率。
阿里云实时计算 Flink 版基于 Flink CDC 提供了数据摄入 CDC YAML 开发[2],通过开发 YAML 作业的方式有效地实现将数据从源端同步到目标端的数据摄入工作,帮助用户在云上高效完成数据入湖入仓。
CDC YAML 核心能力
CDC YAML 主要用于同步数据库变更到其他系统,同步前支持对数据进行简单处理和清洗,帮助用户完成秒级的数据同步工作,构建入湖入仓作业,主要的核心能力有以下几个方面:
端到端 Data Pipeline:支持秒级同步数据变更和结构变更到其他系统,用户可以快速构建自己的数据湖和数据仓库,为数据分析提供基础。
细粒度 Schema Evolution:出于数据安全的考虑,部分高危操作(如删除表,清空数据等)用户不希望同步到目标端。CDC YAML 提供细粒度 Schema 变更的能力,帮助用户限制可同步的数据变更类型。
全增量一体化 CDC 读取:CDC YAML提供了全量和增量的一体化读取能力,全量数据读取完成后自动切换增量读取,无需用户操作。
丰富的 Transform 支持 :支持对数据字段进行处理后同步到下游,如添加额外计算列,添加元数据、只同步某些列、重新指定主键或分区键等场景。CDC YAML 内置了丰富的函数,用户也可以自行开发UDF,兼容 Flink 的 UDF。支持对数据源数据进行过滤,跳过不需要的数据,完成对无用数据和脏数据的清洗工作。
灵活的 Route 策略控制:支持自定义数据源的表同步到目标端表的映射关系,支持一对一、一对多和多对一的多种映射关系,帮助用户灵活指定目标端表名,支持分库分表合并的场景。
完善的 作业 Metric 支持:为了便于判断作业运行的阶段和状态,CDC YAML 提供了丰富的指标。如全量阶段未处理/已处理的表数量,全量阶段未处理/已处理的分片数量、最新一条数据的时间戳等。
阿里云实时计算 Flink 结合用户需求,YAML 作业支持了更多的上下游,支持同步到常见的数据湖和数据仓库,已经支持的上下游如下。
连接器 | 支持类型 | |
---|---|---|
Source | Sink | |
MySQL | √ | × |
消息队列Kafka | √ | √ |
实时数仓Hologres | × | √ |
Upsert Kafka | × | √ |
× | √ | |
StarRocks | × | √ |
流式数据湖仓Paimon | × | √ |
YAML 作业 与 SQL / DataStream 作业对比
Flink 提供了两种级别的作业开发方式:SQL 和 DataStream,下面会比较一下相比于 SQL 和 DataStream来说 CDC YAML 开发有什么优势。
CDC YAML 作业相比 SQL 主要有以下一些优势:
数据摄入YAML | SQL |
---|---|
自动识别 Schema,支持整库同步 | 需要人工写 Create Table 和 Insert 语句 |
支持细粒度 Schema 变更 | 不支持 Schema 变更 |
支持原始 Changelog 同步 | 破坏原始 Changelog 结构 |
支持读写多个表 | 读写单个表 |
CDC YAML 作业相比 DataStream 作业的优势如下:
数据摄入YAML | DataStream |
---|---|
为各级别用户设计,不只是专家 | 需要熟悉Java和分布式系统 |
隐藏底层细节,便于开发 | 需要熟悉Flink框架 |
YAML格式容易理解和学习 | 需要了解Maven等工具管理相关依赖 |
已有作业方便复用 | 难以复用已有代码 |
CDC YAML 应用场景
CDC YAML 能够支持用户数据同步多种应用场景,下面简单介绍一些常见的使用场景,以及应该如何使用 CDC YAML 解决这些问题。
整库同步,构建数据湖仓
整库同步是数据同步最常见的一种使用场景,将存在数据库的数据同步到数据湖或数据仓库中,为后续的数据分析提供基础。
如下的数据摄入YAML作业可以完成同步整个app_db数据库到Paimon的工作,快速完成数据入湖的同步工作。
source:
type: mysql
name: MySQL Source
hostname: ${
secret_values.mysql-hostname}
port: 3306
username: flink
password: ${
secret_values.mysql-password}
tables: app_db.\.*
server-id: 18601-18604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: ${
secret_values.test_ak}
catalog.properties.fs.oss.accessKeySecret: ${
secret_values.test_sk}
在整库同步中,有时需要重新定义同步到目标端的表名,防止发生冲突。或者需要做一些简单的数据处理和数据过滤工作,这时需要结合 transform 模块和 route 模块完成,transform 模块负责数据处理工作,route 模块负责数据分发工作。
例如在如下的整库同步作业中,为 app_db.customers 表添加额外的计算列 upper 和数据库名这个元数据列db,同时在目标端为三张表名添加版本后缀。
source:
type: mysql
name: MySQL Source
hostname: ${
secret_values.mysql-hostname}
port: 3306
username: flink
password: ${
secret_values.mysql-password}
tables: app_db.\.*
server-id: 18601-18604
transform:
- source-table: app_db.customers
projection: \*, UPPER(`name`) AS upper, __schema_name__ AS db
route:
- source-table: app_db.customers
sink-table: app_db.customers_v1
- source-table: app_db.products
sink-table: app_db.products_v0
- source-table: app_db.orders
sink-table: app_db.orders_v0
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: ${
secret_values.test_ak}
catalog.properties.fs.oss.accessKeySecret: ${
secret_values.test_sk}
CDC YAML提供了丰富的内置函数,还支持用户自定义函数来完成复杂的数据处理。CDC YAML 对Flink SQL 的自定义函数进行了兼容,大部分的 Flink 自定义函数可以在数据摄入YAML中直接使用。
分库分表合并
在高并发和大数据量场景下,用户可能选择将一个表拆分为多个库的多张表存储数据,对于分库分表在分析数据前,希望将数据合并为数据湖仓中的一张表。
假设app_db数据库中只有customers_v0,customers_v1和customers_v2三张表,如下 CDC YAML 作业可以将这三张分表合并为一张表customers,完成分库分表的数据同步。
source:
type: mysql
name: MySQL Source
hostname: ${
secret_values.mysql-hostname}
port: 3306
username: flink
password: ${
secret_values.mysql-password}
tables: app_db.customers\.*
server-id: 18601-18604
route:
- source-table: app_db.customers\.*
sink-table: app_db.customers
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: ${
secret_values.test_ak}
catalog.properties.fs.oss.accessKeySecret: ${
secret_values.test_sk}
原始 Binlog 数据同步 Kafka,对接已有系统
用户除了整库整表同步,有些场景下需要获取到详细的 Changelog 变更历史,而不是变更后的数据。将 Binlog 同步到 Kafka,结合分布式消息队列 Kafka 可以提高数据消费速度,解决消费同一个 Binlog 导致的数据瓶颈。后续能够使用 Kafka 里的 Binlog 历史, 进行数据回放和数据审计工作,或者消费 Kafka 实时监控数据变更历史,从而触发通知和报警。
由于 Flink SQL 作业里传递的数据结构是 RowData,会将一条 Update 操作拆分为 Update before 和 Update after 两条消息发送,破坏了 Changelog 原有的结构,无法完成原始 Binlog 数据同步。CDC YAML 使用了 SchemaChangeEvent 和 DataChangeEvent 传递数据,可以完整保留 Changelog,帮助用户完成同步原始 Binlog 数据到 Kafka 的工作。
如下作业可以将数据库 app_db 的变更历史同步到 Kafka,app_db 数据库下的表 customers、products 和 shipments 的变更会各自写入对应的 topic 中。
source:
type: mysql
name: MySQL Source
hostname: ${
secret_values.mysql-hostname}
port: 3306
username: flink
password: ${
secret_values.mysql-password}
tables: app_db.\.*
server-id: 18601-18604
metadata-column.include-list: op_ts
sink:
type: Kafka
name: Kafka Sink
properties.bootstrap.servers: ${
secret_values.bootstraps-server}
properties.enable.idempotence: false
存储时支持使用 debezium-json(默认) 或 canal-json 格式,使用 debezium 或 canal 的历史同步作业可以平滑切换为使用 Flink CDC YAML 进行同步,下游的消费者无需修改逻辑。以一条Update消息为例,debezium-json 和 canal-json 数据格式分别如下。
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}
细粒度 Schema 变更
CDC YAML 作业支持同步数据源的 Schema 变更到目标端,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。但是下游目标端可能无法支持全部类型的变更,或者出于数据安全和权限的考虑,不希望将全部的变更同步到下游。比如希望保留全部历史数据,不希望执行删除和清空等操作。
为了满足更多的用户场景,数据摄入YAML提供了多种变更策略:
LENIENT(默认):按照固定的模式,将部分类型的变更转换后同步或跳过同步,确保 Schema Evolution 的向后兼容性。
EXCEPTION:不允许变更行为,发生变更时作业抛出异常。
IGNORE:跳过全部变更。
EVOLVE:同步所有变更,同步失败作业抛出异常。
TRY_EVOLVE:尝试同步变更,目标端不支持变更时不报错。
如果不同的变更策略无法满足需求,数据摄入YAML还提供了更细粒度的调控配置,在 Sink 模块中设置 include.schema.changes 和 exclude.schema.changes 选项可以控制需要同步的和需要过滤的变更类型。
如下作业使用 EVOLVE 模式,可以正常同步全部变更,但是会跳过同步删除表,删除列和清空表的操作。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${
secret_values.holo-username}
password: ${
secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE
宽容模式同步
Schema 变更同步相比于数据同步可能耗时更多,因为在多并发情况下需要等全部数据都写出后,才可以安全地进行 Schema 变更,而且下游的目标端可能无法支持重现全部的 Schema 变更。
为了作业能够更宽容地处理这些变更,在 Hologres 目标端支持了宽容模式同步。Hologres 不支持变更列的类型,在宽容模式下,CDC YAML 将多个 MySQL 数据类型映射到更宽的 Hologres 类型,跳过不必要的类型变更事件,从而让作业正常运行,可以通过配置项sink.type-normalize-strategy
进行更改。
例如,如下作业使用 ONLY_BIGINT_OR_TEXT 让 MySQL 类型只对应到 Hologres 的 int8 和 text 两种类型。如果 MySQL 某个列类型从 INT 改为 BIGINT ,Hologres 将这两种 MySQL 类型都对应到 int8 类型,作业不会因为无法处理类型转换而报错。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${
secret_values.holo-username}
password: ${
secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
新增同步表
在一些业务场景企业开发了新业务模块(如会员系统、积分系统),需要新增数据库表,并将其数据同步到现有的数据仓库、数据湖或实时计算平台中。或者由于业务调整变化,作业启动时未同步的表需要同步到数据湖仓。
对于在运行中的作业,这些新表就是新增表。新增表存在两种不同的场景,CDC YAML 对不同新增表场景需要使用不同的处理方式,不需要新增作业。
如果新增加的表是空表,由于数据都是新插入的数据,因此不需要同步历史数据,可以在 mysql source 模块上设置 scan.binlog.newly-added-table.enabled=true。在这种场景下,被 CDC YAML 作业匹配的新创建的表会自动同步到目标端。
如果新增加的表是作业启动前存在的表,客户希望表里的历史数据需要同步,可以在 mysql source 模块上设置 scan.newly-added-table.enabled=true,然后从 savepoint 重启作业。
指定同步位点重跑数据
运行中的 CDC YAML 作业可能因为一些预期外的错误而退出,比如 Binlog 过期清理,无法解析的 Binlog 内容,解析代码的 Bug 等,这些错误会导致作业无法从原有位置恢复。CDC YAML 支持使用指定位点启动作业,通过修正部分数据 + 指定位点的方式,可以帮助作业继续运行。
阿里云 Flink CDC 企业级功能
阿里云实时计算 Flink 版在支持开源Flink CDC的所有功能外,还结合企业级客户的需求和场景,提供了以下企业级特性,帮助云上企业更好地完成数据实时化改造。
MySQL CDC 企业级性能优化
MySQL CDC 消费 Binlog 是单并行度运行的,消费性能存在瓶颈,阿里云实时计算 Flink 版数据摄入 YAML 对MySQL CDC 消费性能进行了大幅优化:
Debezium Bump参数优化:Debezium 读取数据时,一些参数可以适当调整以获取更好的性能。该方式对比开源 Flink CDC 可以提高11%的性能。
过滤无关表数据:MySQL CDC 消费整个实例的 Binlog,跳过不匹配的表的数据可以加速解析。该方式提升的性能取决于无关表数据的占比。
并行解析 Binlog:Binlog 解析字节流时,可以从单并发优化为多并发加速解析速度。该方式对比开源 Flink CDC 可以提高14%的性能。
并行序列化:通过火焰图发现 CPU 在完成从 Event 到 SourceRecord 和从 SourceRecord 到 JSON 的序列化过程中耗时较多,优化为并行序列化并保序可以提高性能。该方式对比开源 Flink CDC 可以提高42%的性能。
结合以上 4 种优化方式,实时计算 Flink 版数据摄入 YAML 相比于社区 Flink CDC 来说,如果 Binlog 只有单个表的数据,普适的性能会提升 80%左右;如果 Binlog 包含多个表的数据且 YAML 作业只需要同步部分表,则可以获得 10 倍左右的性能提升。
OSS 持久化 binlog 消费支持
MySQL 的数据库实例只有一份 Binlog,如果数据更新太快很可能导致消费速度赶不上生产速度,从而 Binlog 日志被清理,无法从消费失败的位置指定位点重启作业。
阿里云 RDS MySQL 实例支持将 Binlog 同步到 OSS,MySQL CDC 可以使用这部分离线的日志启动作业。用户使用 RDS MySQL 作为上游时,可以指定对应 OSS 配置,当指定的时间戳或者 Binlog 位点对应的文件保存在 OSS 时,会自动拉取 OSS 日志文件到 Flink 集群本地进行读取,当指定的时间戳或者 Binlog 位点对应的文件保存在数据库本地时,会自动切换使用数据库连接读取,彻底解决 Binlog 日志过期导致的作业重跑或数据不一致问题。
更丰富的监控指标
为了便于判断作业运行的阶段和状态,商业版提供更丰富的监控指标。
当前作业处于全量或增量阶段
全量阶段未处理/已处理的表数量
全量阶段未处理/已处理的分片数量
最新一条数据的时间戳
读取数据的延迟
全量阶段的消息条数/全量阶段每个表的消息条数
消息总条数/每个表的消息条数
十分钟在阿里云免费实现一个 CDC YAML 作业
接下来我们使用阿里云免费试用 [3] 来快速测试一下 CDC YAML 作业的功能,完成一个简单的 MySQL 到 Paimon 的整库同步作业。
资源准备
开始测试前需要准备好一个 RDS MySQL 实例,一个实时计算 Flink 版环境,一个 OSS 对象存储。
OSS 对象存储
OSS 对象存储用作数据湖存储,并且用在实时计算 Flink 版的 checkpoint 存储。
在免费试用搜索oss,点击立即试用对象存储 OSS。
试用成功后,在 OSS 控制台创建一个杭州地区的Bucket。创建成功后,在新建的 Bucket 的文件列表中新建目录 warehouse,用于存储数据湖数据。
RDS MySQL 实例
RDS MySQL 作为测试的 MySQL 数据源。
在免费试用搜索RDS MySQL,点击立即试用云数据库 RDS MySQL。
试用成功后,在 RDS 控制台创建一个杭州地区的RDS MySQL集群。
在实例列表点击进入刚创建的实例,在账号管理创建一个高权限账号。
账号创建完成后,在数据库管理创建一个数据库用于测试。
点击登录数据库,使用用户名和密码登录,然后在数据库下创建一些表并插入测试数据。此处创建了products 和 users 两张表,每个表各生成 5 条测试数据。
在白名单与安全组点击全部开放,打通网络连接。此处因为测试目的使用了全部开放,在生产环境请合理配置白名单。
实时计算 Flink 版
在免费试用搜索 flink,点击立即试用实时计算 Flink 版。
完成RAM授权并领取资源抵扣包后,在杭州地区与RDS相同的可用区创建实时计算 Flink 版实例。
创建 AccessKey
Paimon 访问 OSS 时需要使用 AccessKey,参照创建AccessKey文档[4]创建 AccessKey。
CDC YAML 整库同步 Paimon
当所需资源和测试数据都准备好后,接下来让我们在实时计算 Flink 版本上快速开发整库同步作业。
为了数据安全,可以将需要使用的信息在变量管理用密文保存。
保存好变量后,在数据摄入中创建一个整库同步作业并部署上线。
部署成功后,在作业运维点击启动按钮启动作业。
作业启动后,可以在监控告警的数据摄入看到 CDC YAML 同步状态。如下监控可以看出测试作业已经进入了增量阶段,一共同步了2张表,2个分片,每个表全量分别同步了5条数据。
为了查看测试数据,可以使用数据开发 ETL 的调试功能。
首先在元数据管理创建对应的Paimon Catalog。
创建一个 Session 集群用于运行查看数据结果的 SQL,注意选择数据摄入支持的引擎版本。
在数据开发 ETL 中,创建一个 select SQL 作业查看数据,点击调试在 Session 集群运行,可以在控制台查看数据结果。
至此一个完整的业务案例就已经实现了,接下来可以自由地在 RDS MySQL 数据库侧操作相应的数据修改,重新执行 select 命令查看数据时,变能够观察到通过YAML作业实时同步到Paimon中的数据了。
相关链接
[1] https://nightlies.apache.org/flink/flink-cdc-docs-stable/
欢迎大家多多关注 Flink CDC,从钉钉用户交流群[1]、微信公众号[2]、Slack 频道[3]、邮件列表[4]加入 CDC 用户社区,以及在 Flink CDC GitHub 仓库[5]上参与代码贡献!
[1] “ Flink CDC 社区 ② 群”群的钉钉群号:80655011780
[2] ” Flink CDC 公众号“的微信号:ApacheFlinkCDC
[3] https://flink.apache.org/what-is-flink/community/#slack
[4] https://flink.apache.org/what-is-flink/community/#mailing-lists
[5] https://github.com/apache/flink-cdc
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc