Flink-CDC 采集 Oracle 数据发送到下游 kafka 或其他存储端时, 数据不断膨胀

Flink-CDC 采集 Oracle 数据发送到下游 kafka 或其他存储端时, 数据不断膨胀的问题 (例如: Oracle源库9千万, kafka端2~30亿), 如果不手动关闭采集程序数据就会一直进来

展开
收起
雪哥哥 2022-10-24 17:08:31 814 分享 版权
阿里云 AI 助理回答

针对您遇到的Flink-CDC采集Oracle数据时,数据量在Kafka或其他存储端异常膨胀的问题,以下是一些可能的原因分析及解决方案:

原因分析

  1. 重复消费与处理:如果Flink作业配置不当或Kafka消费者逻辑存在缺陷,可能导致消息被多次处理,从而引起数据膨胀。
  2. Debezium格式处理:Flink SQL在处理Debezium格式的Update类型消息时,会将UPDATE_BEFORE和UPDATE_AFTER转换为DELETE和INSERT两条记录,这可能导致数据量看似增加。
  3. 循环同步问题:如果Flink CDC任务的输出再次被作为输入源同步,可能会形成数据循环同步,导致数据量急剧增长。
  4. Schema变更处理:Flink CDC支持Schema变更同步,但不恰当的处理(如每次变更都生成新列)可能导致数据结构膨胀。

解决方案

  1. 检查并优化消费逻辑

    • 确认Flink作业中消费Kafka消息的部分是否正确实现了幂等性处理,避免重复处理消息。
  2. 调整Debezium消息处理方式

    • 如果使用Debezium格式且发现大量UPDATE操作,理解并接受其转换为DELETE+INSERT的机制。若此机制非预期,考虑自定义数据处理逻辑以合并这些变更事件。
  3. 排查并断开循环同步链路

    • 审查数据流向,确保Flink CDC的输出不会被错误地重新摄入到采集流程中,形成闭环。
  4. 精细化管理Schema变更

    • 在处理数据库Schema变更时,审慎设计目标端的数据模型更新策略,避免不必要的列膨胀。
  5. 监控与日志分析

    • 实施严格的监控,特别是对消息处理计数、数据流速率进行跟踪,通过日志分析识别异常行为。
  6. 资源与配置审查

    • 重新审视Flink作业的资源配置与配置项,如并行度、Checkpoint策略等,确保它们适合当前数据处理需求,避免因配置不当引发的数据堆积或重复处理。

注意事项

  • 数据清理与验证:在调整配置或逻辑后,可能需要对已膨胀的数据进行清理,并验证新的设置是否有效解决了问题。
  • 版本兼容性确认:确保使用的Flink、Debezium以及所有相关组件版本之间兼容,避免因版本不匹配引入的问题。

请根据上述建议逐一排查并实施相应的解决方案,以解决数据膨胀的问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理