开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬, Flink CDC 中我也遇到到state一直增大的问题, 启用了增量还是太大, 请问一下?

大佬, Flink CDC 中我也遇到到state一直增大的问题, 启用了增量checkpoint+rockdb还是太大, 请问一下当时你是怎么删掉这个changelogNormalize算子的?

展开
收起
真的很搞笑 2023-07-02 17:45:08 158 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,由于状态管理的问题,可能会出现状态一直增大的情况,导致程序性能和稳定性下降。如果您使用的是 Flink 的 RocksDB StateBackend,可以通过以下方式来缓解状态一直增大的问题:
    启用 TTL 状态:通过启用 TTL 状态,可以自动清除过期的状态,从而减少状态的存储空间和管理负担。例如,可以通过以下代码启用 TTL 状态:
    java
    Copy
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
    stateBackend.enableTtlCompactionFilter();
    env.setStateBackend(stateBackend);
    启用增量 checkpoint:通过启用增量 checkpoint,可以减少每个 checkpoint 中的状态量,从而缓解状态一直增大的问题。例如,可以通过以下代码启用增量 checkpoint:
    java
    Copy
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointInterval(5000);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    需要注意的是,上述方式仅能缓解状态一直增大的问题,并不能完全解决问题。如果您的状态仍然持续增大,可能需要考虑采用其他优化技术或者重新设计程序逻辑,以减少状态的存储空间和管理负担。

    2023-07-30 09:36:48
    赞同 展开评论 打赏
  • 在 Flink CDC 中,如果启用了增量Checkpoint和RocksDB作为状态后端,但状态仍然持续增大,可能需要进一步考虑以下几个方面:

    1. 检查状态的使用方式: 检查你的应用程序代码,确认是否有意外的状态写入或状态更新操作。确保状态只存储必要的信息,并在不再需要的情况下及时清理。

    2. 优化数据模型: 如果你的数据模型设计存在问题,例如存储冗余的信息或不必要的细粒度信息,可以考虑进行优化。合理地选择键控状态的划分和压缩策略,以降低状态的大小。

    3. 增量快照策略: 考虑调整增量快照的策略,可以通过增加快照触发的时间间隔、降低增量快照的频率等来减少状态的大小。根据实际需求和数据变化的速度,权衡快照大小和恢复的效率。

    关于 "changelogNormalize" 算子,它是 Flink 内部用于处理状态变化的算子之一。在正常情况下,你不需要手动删除或干预该算子。如果你认为该算子导致了状态持续增大的问题,可能需要进一步检查你的应用程序逻辑,尤其是状态处理部分是否存在问题。

    另外,你还可以尝试以下方法来进一步优化状态的大小:

    - 调整 RocksDB 的配置参数,例如 managed.memory.sizeblock.cache.size 等,以更好地管理和利用内存资源。

    - 考虑将状态拆分为多个较小的状态对象,以减少单个状态的大小。

    - 定期清理不再需要的状态数据,可以使用定时任务或基于条件的策略进行状态清理。

    总而言之,优化 Flink CDC 中持续增大的状态需要结合具体场景和应用程序逻辑来进行调整。通过检查状态使用方式、优化数据模型、调整增量快照策略和合理配置 RocksDB 参数等方法,可以有效地降低状态的大小并提高性能

    2023-07-30 09:39:45
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载