Flink CDC为什么我同步mongodb到kafka完成后点击savepoint停止后,savepoint那么大?
mysql到kafka就不会有这个问题
Flink CDC同步MongoDB到Kafka后,点击savepoint停止后,savepoint文件过大的原因可能有以下几点:
Flink CDC在实现过程中,会分为主动查询和事件接收两种技术实现模式。对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动。因此,如果有大量的数据变更,那么保存的文件可能会相对较大。
未启用savepoint目录的压缩功能。Flink中提供了针对checkpoint和savepoint的数据进行压缩的方法,目前Flink仅支持通过用snappy压缩算法对状态数据进行压缩。如果未开启此功能,那么保存的文件可能会占用较大的存储空间。
在某些情况下,例如启用了entropy injection,savepoint目录可能不包含所有的数据文件,因为注入的路径会分散在各个路径中。由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。这也可能是导致savepoint文件过大的原因之一。
总的来说,你可以尝试以下方法来减小savepoint的大小:
您好,Flink CDC同步mongodb到kafka完成后savepoint过大的问题可能是由于以下原因导致的:
即 MongoDB CDC 数据源只能作为一个 upsert source。不过 Flink 框架会自动为 MongoDB
附加一个 Changelog Normalize 节点,补齐 update 事件的前镜像值(即 UPDATE_BEFORE 事件),从而确保 CDC 数据的语义正确性。mongodb 6.0配合最新的master分值就不需要补了MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据;利用这些信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。如果需要补充缺失的变更前事件,一个自然的思路是在 Flink 状态中缓存所有文档的当前版本快照;在遇到被更新或删除的文档时,查表即可得知变更前的状态。然而在最坏的情况下,这种操作可能需要保存相当于 100% 原始数据量的记录。
目前,Flink SQL Planner 会自动为 Upsert 类型的 Source 生成一个 ChangelogNormalize 节点,并按照上述操作将其转换为完整的变更流;代价则是该算子节点需要存储体积巨大的 State 数据。
https://zhuanlan.zhihu.com/p/653731844,此回答整理自钉群“Flink CDC 社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。