我使用的是flink1.18.1+cdc2.4.0如何获取op这个元数据?

我使用flinkcdc对postgressql数据库的数据进行实时同步,但是我想要获取到op元数据信息写入到kafka,这个如何获取?

展开
收起
aliyun2181089008 2025-12-26 17:29:51 20 分享 版权
1 条回答
写回答
取消 提交回答
  • Blueberry King

    在 Flink 1.18.1 + Flink CDC 2.4.0(PostgreSQL) 中:
    op 本身不是 Flink Table 的物理字段,
    而是 CDC 在变更事件中携带的语义信息,
    你只能通过 Changelog / RowKind / metadata column 来获取。
    不同使用方式,获取方式不同。

    场景一:使用 Flink SQL(最常见,也是推荐方式)
    ✅ 正确做法:使用 metadata column + RowKind
    Flink CDC 2.4.x 已经支持 将操作类型映射为 metadata column。

    1️⃣ 建表时声明 metadata 字段
    CREATE TABLE pg_source (
    id INT,
    name STRING,
    op STRING METADATA FROM 'op' VIRTUAL
    ) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'xxx',
    'port' = '5432',
    'username' = 'xxx',
    'password' = 'xxx',
    'database-name' = 'xxx',
    'schema-name' = 'public',
    'table-name' = 'test'
    );

    此时:
    op 的取值为:
    c → insert
    u → update
    d → delete
    r → snapshot read

    2️⃣ 写入 Kafka(直接透传)
    INSERT INTO kafka_sink
    SELECT
    id,
    name,
    op
    FROM pg_source;
    Kafka 中就可以直接消费到 op 字段。

    场景二:不用 metadata,而是通过 RowKind 判断(底层原理)
    如果你不声明 op metadata,本质上 CDC 事件仍然是 Changelog Stream:
    RowKind ----含义
    INSERT ----insert / snapshot
    UPDATE_BEFORE---- update before
    UPDATE_AFTER ----update after
    DELETE ----delete
    可以通过 SQL 映射:
    SELECT
    *,
    CASE
    WHEN ROW_KIND() = 'INSERT' THEN 'c'
    WHEN ROW_KIND() = 'UPDATE_AFTER' THEN 'u'
    WHEN ROW_KIND() = 'DELETE' THEN 'd'
    END AS op
    FROM pg_source;
    注意:
    update 会产生 BEFORE + AFTER 两条记录,一般只取 UPDATE_AFTER。

    关于 PostgreSQL CDC 的
    PostgreSQL CDC update = before + after
    如果你:
    直接写 Kafka
    下游不理解 changelog
    一定要 过滤 UPDATE_BEFORE,否则会误消费。

    2025-12-26 23:17:06
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理