Flink CDC中postgresql cdc打印的第二张图,能帮忙看看么?

Flink CDC中postgresql cdc打印的第二张图,我对数据进行修改,感觉打印出来的before数据不对,after是对的,能帮忙看看么?image.png
image.png
image.png
211修改为266

展开
收起
真的很搞笑 2023-10-04 07:00:37 64 分享 版权
阿里云 AI 助理回答

根据您描述的问题,PostgreSQL CDC在Flink中打印的before数据不正确,而after数据是正确的。这种情况可能与PostgreSQL表的REPLICA IDENTITY设置有关。以下是详细的分析和解决方案:


1. 问题原因分析

在PostgreSQL中,REPLICA IDENTITY决定了逻辑解码插件在捕获变更事件(如UPDATEDELETE)时是否包含涉及列的旧值。如果REPLICA IDENTITY未正确配置为FULL,则可能导致以下问题: - before字段缺失或不完整:默认情况下,REPLICA IDENTITY的值为DEFAULT,此时before字段仅包含主键列的旧值,其他列的旧值将为空。 - after字段正常after字段始终包含更新后的完整数据,因此不受REPLICA IDENTITY的影响。

根据知识库中的说明,如果REPLICA IDENTITY未设置为FULL,Flink SQL将无法正确解析before字段的数据。


2. 解决方案

为了确保before字段能够正确打印出完整的旧值,请按照以下步骤检查并调整PostgreSQL表的配置:

步骤 1:检查当前表的REPLICA IDENTITY设置

运行以下SQL语句,查看目标表的REPLICA IDENTITY设置:

SELECT relname, relreplident 
FROM pg_class 
WHERE relname = '<yourTableName>';
  • 如果返回值为d(表示DEFAULT),则需要修改为FULL
  • 如果返回值为f(表示FULL),则无需修改。

步骤 2:修改表的REPLICA IDENTITYFULL

运行以下SQL语句,将表的REPLICA IDENTITY设置为FULL

ALTER TABLE <yourTableName> REPLICA IDENTITY FULL;

此操作会确保UPDATEDELETE事件的before字段包含所有列的旧值。

步骤 3:验证修改结果

再次运行以下SQL语句,确认REPLICA IDENTITY已成功设置为FULL

SELECT relname, relreplident 
FROM pg_class 
WHERE relname = '<yourTableName>';

返回值应为f


3. 注意事项

  • 性能影响:将REPLICA IDENTITY设置为FULL可能会增加WAL日志的大小,因为每次更新操作都会记录所有列的旧值。请根据实际业务需求权衡性能与数据完整性之间的关系。
  • 权限要求:执行ALTER TABLE操作的用户需要具有表的所有者权限或超级用户权限。
  • 兼容性:确保PostgreSQL版本支持REPLICA IDENTITY FULL功能(通常9.6及以上版本均支持)。

4. 验证Flink CDC输出

完成上述配置后,重新运行Flink作业并观察CDC输出。如果REPLICA IDENTITY已正确设置为FULLbefore字段应能正确打印出完整的旧值。例如:

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 211
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 266
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

5. 其他可能的原因

如果问题仍未解决,请检查以下内容: - Debezium配置:确保Flink CDC连接器的debezium.snapshot.mode参数未设置为never,否则可能导致快照阶段的数据不一致。 - 网络延迟或故障:检查PostgreSQL与Flink之间的网络连通性,确保没有数据丢失或延迟。


通过以上步骤,您可以有效解决before字段数据不正确的问题。如果仍有疑问,请提供更多上下文信息以便进一步排查。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理