Flink CDC里mysql-cdc不支持 元数据读取 row_kind 操作类型么?SQL validation failed. Invalid metadata key 'row_kind' in column 'operation' of table 'vvp.default.source_steam_inventory_fingerprint'. The DynamicTableSource class 'com.ververica.cdc.connectors.mysql.table.MySqlTableSource' supports the following metadata keys for reading:
table_name
database_name
op_ts
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中,mysql-cdc
连接器确实支持元数据读取功能,但其支持的元数据键是有限的。根据知识库中的描述,mysql-cdc
源表支持以下元数据键用于读取:
database_name
:包含该行记录的库名。table_name
:包含该行记录的表名。op_ts
:该行记录在数据库中的变更时间。如果记录来自表的存量历史数据而不是从Binlog中获取,则该值总是0。op_type
:该行记录的变更类型(例如+I
表示INSERT,-D
表示DELETE,-U
表示UPDATE_BEFORE,+U
表示UPDATE_AFTER)。然而,您提到的row_kind
并不在上述支持的元数据键列表中。因此,当您尝试在SQL中使用row_kind
作为元数据键时,会导致SQL验证失败,并提示类似以下错误信息:
SQL validation failed. Invalid metadata key 'row_kind' in column 'operation' of table 'vvp.default.source_steam_inventory_fingerprint'. The DynamicTableSource class 'com.ververica.cdc.connectors.mysql.table.MySqlTableSource' supports the following metadata keys for reading: table_name, database_name, op_ts
如果您需要获取类似于row_kind
的操作类型信息,可以通过op_type
元数据列来实现。op_type
提供了与row_kind
类似的功能,能够标识每条记录的变更类型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)。以下是具体的实现方式:
op_type
元数据列在定义MySQL CDC源表时,可以通过METADATA
语法显式声明op_type
列,以读取操作类型信息。示例如下:
CREATE TEMPORARY TABLE mysql_source_table (
db_name STRING METADATA FROM 'database_name' VIRTUAL, -- 读取库名
table_name STRING METADATA FROM 'table_name' VIRTUAL, -- 读取表名
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 读取变更时间
op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 读取变更类型
id INT,
name STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your-mysql-host',
'port' = '3306',
'username' = 'your-username',
'password' = 'your-password',
'database-name' = 'your-database',
'table-name' = 'your-table'
);
在此示例中,op_type
列将包含每条记录的操作类型信息,您可以根据该列的值进行后续处理。
row_kind
的逻辑处理如果您需要在下游处理中使用row_kind
类似的逻辑,可以通过op_type
列的值进行映射。例如:
+I
对应 INSERT
-D
对应 DELETE
-U
对应 UPDATE_BEFORE
+U
对应 UPDATE_AFTER
您可以在Flink SQL中通过CASE WHEN
语句对op_type
进行转换,以适配您的业务逻辑。示例如下:
SELECT
id,
name,
CASE
WHEN op_type = '+I' THEN 'INSERT'
WHEN op_type = '-D' THEN 'DELETE'
WHEN op_type = '-U' THEN 'UPDATE_BEFORE'
WHEN op_type = '+U' THEN 'UPDATE_AFTER'
END AS row_kind
FROM mysql_source_table;
op_type
元数据列的支持从vvr-4.0.11-flink-1.13
版本开始提供,确保您的Flink版本满足要求。row_kind
:mysql-cdc
连接器目前并未直接支持row_kind
元数据键,因此无法通过METADATA
语法直接读取该字段。mysql-cdc
连接器不支持直接读取row_kind
元数据键,但可以通过op_type
元数据列实现类似功能。通过显式声明op_type
列并结合逻辑处理,您可以满足业务需求。如果仍有疑问,请进一步确认您的Flink版本和具体业务场景,以便提供更精确的支持。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。