Flink CDC里使用mongodb-cdc是不是一定得显式配置checkpoint?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中使用MongoDB CDC时,是否需要显式配置checkpoint取决于具体的使用场景和功能需求。以下是详细说明:
Checkpoint是Flink实现容错机制的核心功能,用于在作业失败时恢复状态。对于MongoDB CDC连接器,Checkpoint的主要作用包括: - 断点续传:通过记录Change Stream的Resume Token,确保作业可以从上次停止的位置继续读取数据。 - 增量快照支持:如果启用了增量快照功能(scan.incremental.snapshot.enabled=true
),Checkpoint是必需的,因为增量快照依赖于Checkpoint来记录读取进度。
在Flink作业中,可以通过以下方式显式配置Checkpoint参数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 60 * 1000); // 每10分钟触发一次Checkpoint
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100); // 容忍Checkpoint失败次数
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // Checkpoint之间的最小间隔
oplog.rs
中被清理,从而引发无效Resume Token的异常。因此,建议合理设置Checkpoint间隔时间,并调整oplog.rs
的大小以避免日志过早被清理。scan.incremental.snapshot.enabled=true
),必须确保Checkpoint能够正常触发,否则可能导致作业无法正确恢复。以上内容基于知识库资料整理,请根据实际业务需求选择合适的配置方案。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。