Flink CDC中postgresql cdc打印的第二张图,我对数据进行修改,感觉打印出来的before数据不对,after是对的,能帮忙看看么?
211修改为266
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,PostgreSQL CDC在Flink中打印的before
数据不正确,而after
数据是正确的。这种情况可能与PostgreSQL表的REPLICA IDENTITY
设置有关。以下是详细的分析和解决方案:
在PostgreSQL中,REPLICA IDENTITY
决定了逻辑解码插件在捕获变更事件(如UPDATE
或DELETE
)时是否包含涉及列的旧值。如果REPLICA IDENTITY
未正确配置为FULL
,则可能导致以下问题: - before
字段缺失或不完整:默认情况下,REPLICA IDENTITY
的值为DEFAULT
,此时before
字段仅包含主键列的旧值,其他列的旧值将为空。 - after
字段正常:after
字段始终包含更新后的完整数据,因此不受REPLICA IDENTITY
的影响。
根据知识库中的说明,如果REPLICA IDENTITY
未设置为FULL
,Flink SQL将无法正确解析before
字段的数据。
为了确保before
字段能够正确打印出完整的旧值,请按照以下步骤检查并调整PostgreSQL表的配置:
REPLICA IDENTITY
设置运行以下SQL语句,查看目标表的REPLICA IDENTITY
设置:
SELECT relname, relreplident
FROM pg_class
WHERE relname = '<yourTableName>';
d
(表示DEFAULT
),则需要修改为FULL
。f
(表示FULL
),则无需修改。REPLICA IDENTITY
为FULL
运行以下SQL语句,将表的REPLICA IDENTITY
设置为FULL
:
ALTER TABLE <yourTableName> REPLICA IDENTITY FULL;
此操作会确保UPDATE
和DELETE
事件的before
字段包含所有列的旧值。
再次运行以下SQL语句,确认REPLICA IDENTITY
已成功设置为FULL
:
SELECT relname, relreplident
FROM pg_class
WHERE relname = '<yourTableName>';
返回值应为f
。
REPLICA IDENTITY
设置为FULL
可能会增加WAL日志的大小,因为每次更新操作都会记录所有列的旧值。请根据实际业务需求权衡性能与数据完整性之间的关系。ALTER TABLE
操作的用户需要具有表的所有者权限或超级用户权限。REPLICA IDENTITY FULL
功能(通常9.6及以上版本均支持)。完成上述配置后,重新运行Flink作业并观察CDC输出。如果REPLICA IDENTITY
已正确设置为FULL
,before
字段应能正确打印出完整的旧值。例如:
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 211
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 266
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
如果问题仍未解决,请检查以下内容: - Debezium配置:确保Flink CDC连接器的debezium.snapshot.mode
参数未设置为never
,否则可能导致快照阶段的数据不一致。 - 网络延迟或故障:检查PostgreSQL与Flink之间的网络连通性,确保没有数据丢失或延迟。
通过以上步骤,您可以有效解决before
字段数据不正确的问题。如果仍有疑问,请提供更多上下文信息以便进一步排查。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。