我使用flinkcdc对postgressql数据库的数据进行实时同步,但是我想要获取到op元数据信息写入到kafka,这个如何获取?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在 Flink 1.18.1 + Flink CDC 2.4.0(PostgreSQL) 中:
op 本身不是 Flink Table 的物理字段,
而是 CDC 在变更事件中携带的语义信息,
你只能通过 Changelog / RowKind / metadata column 来获取。
不同使用方式,获取方式不同。
场景一:使用 Flink SQL(最常见,也是推荐方式)
✅ 正确做法:使用 metadata column + RowKind
Flink CDC 2.4.x 已经支持 将操作类型映射为 metadata column。
1️⃣ 建表时声明 metadata 字段
CREATE TABLE pg_source (
id INT,
name STRING,
op STRING METADATA FROM 'op' VIRTUAL
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'xxx',
'port' = '5432',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'schema-name' = 'public',
'table-name' = 'test'
);
此时:
op 的取值为:
c → insert
u → update
d → delete
r → snapshot read
2️⃣ 写入 Kafka(直接透传)
INSERT INTO kafka_sink
SELECT
id,
name,
op
FROM pg_source;
Kafka 中就可以直接消费到 op 字段。
场景二:不用 metadata,而是通过 RowKind 判断(底层原理)
如果你不声明 op metadata,本质上 CDC 事件仍然是 Changelog Stream:
RowKind ----含义
INSERT ----insert / snapshot
UPDATE_BEFORE---- update before
UPDATE_AFTER ----update after
DELETE ----delete
可以通过 SQL 映射:
SELECT
*,
CASE
WHEN ROW_KIND() = 'INSERT' THEN 'c'
WHEN ROW_KIND() = 'UPDATE_AFTER' THEN 'u'
WHEN ROW_KIND() = 'DELETE' THEN 'd'
END AS op
FROM pg_source;
注意:
update 会产生 BEFORE + AFTER 两条记录,一般只取 UPDATE_AFTER。
关于 PostgreSQL CDC 的
PostgreSQL CDC update = before + after
如果你:
直接写 Kafka
下游不理解 changelog
一定要 过滤 UPDATE_BEFORE,否则会误消费。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。