Flink CDC里 CDC 3.0怎么配置transform操作?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC 3.0中,transform
操作用于对流经Flink管道的数据进行处理和转换。通过配置YAML文件中的transform
模块,可以实现数据的投影、过滤、列裁剪、计算列等功能。以下是详细的配置方法和步骤:
transform
模块的语法结构如下:
transform:
- source-table: <上游表名>
projection: <投影规则>
filter: <过滤规则>
primary-keys: <主键列表>
partition-keys: <分区键列表>
table-options: <额外配置信息>
description: <描述信息>
converter-after-transform: <转换器>
以下是transform
模块中各配置项的具体含义及使用方法:
source-table
source-table: db.tbl1
projection
SELECT
语句。projection: *
。projection: id, name
projection: id, id + 1 AS inc_id
filter
WHERE
语句。filter: age > 18
primary-keys
primary-keys: id
partition-keys
partition-keys: region
table-options
key=value
形式,多个配置项用逗号分隔。table-options: bucket=10, comment='example'
description
transform
块添加描述信息。converter-after-transform
transform
后对数据进行额外处理的转换器。SOFT_DELETE
(VVR 8.0.11及以上版本支持)。converter-after-transform: SOFT_DELETE
以下是一个完整的transform
模块配置示例,展示如何对数据进行投影、过滤和计算列操作:
transform:
- source-table: db.tbl1
projection: id, name, salary * 1.1 AS adjusted_salary
filter: salary > 5000
primary-keys: id
partition-keys: department
table-options: bucket=5
description: Adjust salaries and filter high earners
计算列限制:计算列的表达式不能引用其他计算列的值,即使被引用的列出现在该计算列之前。例如,以下配置是非法的:
projection: a, b AS c, c AS d
原因:c
依赖于b
,而d
又依赖于c
,这会导致解析错误。
表结构变更同步:如果需要将上游表结构变更自动同步到下游,必须显式定义projection: *
,否则可能导致下游表结构不同步。
性能优化:在增量读取阶段,Flink CDC 3.0会自动关闭空闲读取器以节省计算资源,因此无需手动管理连接。
通过上述配置,您可以灵活地使用Flink CDC 3.0的transform
模块对数据进行处理和转换。根据实际需求,合理配置projection
、filter
等参数,能够满足复杂的数据同步场景。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。