开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC为什么我同步mongodb到kafka完成后点击savepoint停止?

Flink CDC为什么我同步mongodb到kafka完成后点击savepoint停止后,savepoint那么大?image.png
mysql到kafka就不会有这个问题image.png

展开
收起
真的很搞笑 2023-10-30 17:41:13 60 0
3 条回答
写回答
取消 提交回答
  • Flink CDC同步MongoDB到Kafka后,点击savepoint停止后,savepoint文件过大的原因可能有以下几点:

    1. Flink CDC在实现过程中,会分为主动查询和事件接收两种技术实现模式。对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动。因此,如果有大量的数据变更,那么保存的文件可能会相对较大。

    2. 未启用savepoint目录的压缩功能。Flink中提供了针对checkpoint和savepoint的数据进行压缩的方法,目前Flink仅支持通过用snappy压缩算法对状态数据进行压缩。如果未开启此功能,那么保存的文件可能会占用较大的存储空间。

    3. 在某些情况下,例如启用了entropy injection,savepoint目录可能不包含所有的数据文件,因为注入的路径会分散在各个路径中。由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。这也可能是导致savepoint文件过大的原因之一。

    总的来说,你可以尝试以下方法来减小savepoint的大小:

    • 尝试关闭savepoint目录的压缩功能,或者改变压缩算法;
    • 调整Flink的配置,如调整持久化策略、调整Checkpoint的间隔等;
    • 检查Flink任务的逻辑,看是否有可以优化的地方,比如减少不必要的数据输出等。
    2023-10-31 19:02:38
    赞同 展开评论 打赏
  • 您好,Flink CDC同步mongodb到kafka完成后savepoint过大的问题可能是由于以下原因导致的:

    1. MongoDB CDC Connector在增量同步阶段写入并行度强制为1,这意味着即使默认并行度设置得较高,但由于只有一个subtask在进行写入,因此性能无法进一步提升。
    2. Flink作业中对同一个数据库复用了一个CDC source实例,连接多个sink将不同表中的数据分发至不同的Kafka topic。这种整库同步解决方案可能导致savepoint文件较大。
    3. Flink SQL目前只能进行单表Flink CDC的作业操作,这可能导致数据库CDC的连接数过多,从而影响savepoint的大小。
    2023-10-31 15:21:13
    赞同 展开评论 打赏
  • 即 MongoDB CDC 数据源只能作为一个 upsert source。不过 Flink 框架会自动为 MongoDB
    附加一个 Changelog Normalize 节点,补齐 update 事件的前镜像值(即 UPDATE_BEFORE 事件),从而确保 CDC 数据的语义正确性。mongodb 6.0配合最新的master分值就不需要补了MongoDB 6.0 之前的版本默认不会提供变更前文档及被删除文档的数据;利用这些信息只能实现 Upsert 语义(即缺失了 Update Before 数据条目)。但在 Flink 中许多有用的算子操作都依赖完整的 Insert、Update Before、Update After、Delete 变更流。如果需要补充缺失的变更前事件,一个自然的思路是在 Flink 状态中缓存所有文档的当前版本快照;在遇到被更新或删除的文档时,查表即可得知变更前的状态。然而在最坏的情况下,这种操作可能需要保存相当于 100% 原始数据量的记录。

    目前,Flink SQL Planner 会自动为 Upsert 类型的 Source 生成一个 ChangelogNormalize 节点,并按照上述操作将其转换为完整的变更流;代价则是该算子节点需要存储体积巨大的 State 数据。
    https://zhuanlan.zhihu.com/p/653731844,此回答整理自钉群“Flink CDC 社区”

    2023-10-30 22:26:54
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载