Flink CDC里mongo cdc 从checkpoint启动,发现又同步全量数据了,是什么情况?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink CDC的MongoDB CDC连接器时,如果从Checkpoint启动后发现又同步了全量数据,可能是以下原因导致的:
MongoDB CDC通过Checkpoint记录Change Stream的Resume Token来实现从断点恢复。如果Resume Token对应的记录在oplog.rs
(MongoDB变更日志集合)中已经被清理或不存在,则会导致无法从增量阶段恢复,从而重新触发全量同步。
oplog.rs
的大小:确保oplog.rs
的保留时间足够长,避免Resume Token对应的记录被过早清理。可以通过MongoDB的配置参数调整oplog.rs
的大小,详情请参考MongoDB官方文档。如果Checkpoint未能正确保存Resume Token,或者Checkpoint本身损坏,也会导致从Checkpoint启动时无法恢复增量同步,从而重新触发全量同步。
execution.checkpointing.interval
和execution.checkpointing.tolerable-failed-checkpoints
等参数是否合理。MongoDB CDC依赖于Change Stream功能,而Change Stream需要MongoDB运行在副本集或分片集群架构下。如果MongoDB运行在单节点架构下,可能会导致增量同步失败,从而重新触发全量同步。
rs.initiate()
命令初始化为单节点副本集。The $changeStream stage is only supported on replica sets
的错误,说明MongoDB运行模式不满足要求。如果在全量同步阶段发生了表结构变更,或者新增了表,可能会导致作业无法正常从增量阶段恢复,从而重新触发全量同步。
scan.newly-added-table.enabled
或scan.binlog.newly-added-table.enabled
配置项,但需注意两者不建议同时开启,否则可能导致数据重复问题。从Checkpoint启动后重新同步全量数据的问题,通常与Resume Token失效、Checkpoint配置不当、MongoDB运行模式不支持增量同步、表结构变更或新增表等因素有关。建议按照上述解决方案逐一排查问题,并根据具体场景调整配置以避免类似问题再次发生。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。