Flink CDC这两个参数 其中这个参数不能加 checkpoint开启了 之前是丢数据 现在是出现不同步的问题 ?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC在开启Checkpoint后出现了不同步的问题。结合知识库中的相关资料,以下是可能的原因分析及解决方案:
Checkpoint的作用
开启Checkpoint后,Flink CDC可以保证数据的精确一次(EXACTLY_ONCE)语义。但如果配置不当,可能会导致同步延迟或不同步的问题。
可能导致不同步的原因
根据知识库中的信息,以下几种情况可能导致不同步:
execution.checkpointing.interval
设置过大,可能会导致全量同步阶段Checkpoint超时,进而引发Failover。调整Checkpoint间隔时间
建议将execution.checkpointing.interval
设置为合理的值(例如10分钟),以避免Checkpoint超时问题。
示例配置:
execution.checkpointing.interval: 10min
增加Checkpoint失败容忍次数
配置execution.checkpointing.tolerable-failed-checkpoints
参数,允许更多的Checkpoint失败重试次数。例如:
execution.checkpointing.tolerable-failed-checkpoints: 100
关闭全量同步阶段的Checkpoint
如果使用的是Postgres CDC连接器,建议在全量同步阶段关闭Checkpoint,仅在增量同步阶段启用。可以通过以下参数实现:
execution.checkpointing.interval-during-backlog: 0
定期清理无用的Replication Slot
Postgres CDC会在全量同步阶段创建多个临时的Replication Slot。如果这些Slot未被及时清理,可能会导致磁盘空间不足或WAL日志堆积。
解决方法:
SELECT * FROM pg_replication_slots;
SELECT pg_drop_replication_slot('slot_name');
确保Checkpoint正常更新LSN
Postgres CDC只会在Checkpoint完成时更新Replication Slot中的LSN(Log Sequence Number)。如果发现磁盘使用率高,请确认Checkpoint是否正常运行,并检查是否有其他未使用的Slot。
debezium.event.deserialization.failure.handling.mode: warn
或者完全忽略脏数据:
debezium.event.deserialization.failure.handling.mode: ignore
debezium.decimal.handling.mode: string
确保Flink版本兼容性
某些功能(如增量快照、Processing Time Temporal Join)需要特定版本的Flink支持。请确认您的Flink版本是否满足要求。
避免Resume Token失效
如果使用MongoDB CDC,需确保oplog.rs集合大小足够大,以避免Resume Token对应的记录被删除。
合理配置缓存参数
如果使用Python自定义函数,需注意python.fn-execution.bundle.size
和python.fn-execution.bundle.time
参数的配置,避免因缓存过多数据导致Checkpoint失败。
通过以上步骤,您可以逐步排查并解决Flink CDC不同步的问题。如果问题仍未解决,请提供更多上下文信息(如具体的错误日志或配置文件),以便进一步分析。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。