Flink CDC初次用【flink run】大家有见过这种情况么?

Flink CDC初次用【flink run】从 oceanbase 全量同步数据到 doris,同步完成后,数据量小的表随着源表的增删改操作能正常同步,但是差不多2亿多条数据量的表在源表有变动的情况下,就没有任何变动。大家有见过这种情况么?
flink 版本: 1.17.0

展开
收起
真的很搞笑 2023-12-31 08:47:24 52 发布于黑龙江 分享
分享
版权
举报
阿里云 AI 助理回答

根据您描述的问题,Flink CDC 在初次全量同步完成后,小数据量的表能够正常进行增量同步,但大数据量(约 2 亿条记录)的表在源表发生变动时无法同步。以下是可能的原因分析及解决方案:


1. 增量同步位点问题

在 OceanBase 数据库中,增量同步依赖于源端数据库的增量日志(如 Redo Log 或 Binlog)。如果增量同步的起始位点未正确设置,可能导致大数据量表的增量数据无法被捕捉。

  • 原因:Flink CDC 可能在全量同步完成后未能正确获取增量日志的起始位点,导致后续的增量数据无法同步。
  • 解决方案
    • 确保在创建数据迁移任务时,选择了 增量同步 类型,并且未选择 全量迁移/全量同步
    • 检查增量同步的起始位点是否正确配置。可以通过手动设置增量同步位点为全量同步完成的时间点来解决此问题。
    • 如果源端数据库的增量日志已过期或归档,请重新初始化同步任务,并确保增量日志的保存时间足够长以覆盖全量同步的时间。

2. Flink CDC 的状态管理问题

Flink CDC 在处理大数据量表时,可能会因为状态管理不当导致增量同步失败。

  • 原因
    • Flink 的 Checkpoint 或 Savepoint 配置不合理,导致增量同步的状态丢失。
    • 大数据量表的全量同步耗时较长,期间可能发生了 Checkpoint 超时或失败。
  • 解决方案
    • 调整 Flink 的 Checkpoint 配置,确保其频率和超时时间适合大数据量表的同步需求。例如:
    execution.checkpointing.interval: 60000 # 每 60 秒触发一次 Checkpoint
    execution.checkpointing.timeout: 300000 # Checkpoint 超时时间为 5 分钟
    
    • 启用增量 Checkpoint(Incremental Checkpoint),减少状态存储的压力。
    • 在全量同步完成后,手动触发一次 Savepoint,确保增量同步的初始状态被正确保存。

3. 源端数据库的性能瓶颈

OceanBase 数据库在处理大数据量表的增量日志时,可能会因为性能瓶颈导致增量数据无法及时生成或读取。

  • 原因
    • 源端数据库的增量日志生成速度较慢,无法满足 Flink CDC 的读取需求。
    • 全量同步过程中对源端数据库的压力过大,影响了增量日志的生成。
  • 解决方案
    • 在全量同步阶段,降低读取并发数以减少对源端数据库的压力。例如:
    source.parallelism: 8 # 设置较低的并发数
    
    • 监控源端数据库的性能指标(如 CPU、内存、I/O),确保其有足够的资源支持增量日志的生成。
    • 如果源端数据库的增量日志保存时间较短,建议延长其保存时间,避免日志过期。

4. Doris 目标端的写入瓶颈

Doris 数据库在接收大数据量表的增量数据时,可能会因为写入性能不足导致同步失败。

  • 原因
    • Doris 的写入并发数配置较低,无法处理高吞吐量的增量数据。
    • 写入速率限制(RPS/BPS)设置不合理,导致数据积压。
  • 解决方案
    • 提高 Doris 的写入并发数配置,确保其能够处理高吞吐量的增量数据。例如:
    sink.parallelism: 16 # 设置较高的并发数
    
    • 调整写入速率限制(RPS/BPS),避免因限流导致数据积压。例如:
    sink.rate-limit: 10000 # 每秒最多写入 10,000 行数据
    

5. DDL 同步的支持问题

如果源表在增量同步过程中发生了 DDL 操作(如新增列、删除列等),可能会导致同步任务中断。

  • 原因
    • Flink CDC 不支持某些复杂的 DDL 操作,例如 ALTER TABLE ADD COLUMN 的所有选项。
    • 如果 DDL 操作未正确同步至目标端,可能导致增量数据无法写入。
  • 解决方案
    • 检查源表的 DDL 操作是否在 Flink CDC 的支持范围内。如果不支持,请联系技术支持人员或手动调整目标表结构。
    • 在增量同步任务中启用 DDL 同步功能,并确保目标端表结构与源端保持一致。

6. 日志和监控排查

为了进一步定位问题,建议通过以下方式排查:

  • 查看 Flink 作业日志:检查是否有异常信息(如 Checkpoint 失败、增量日志读取失败等)。
  • 监控增量同步延迟:通过 OceanBase 和 Doris 的监控工具,查看增量同步的延迟时间。如果延迟过高,可能是性能瓶颈导致的。
  • 开启告警监控:在创建数据迁移任务时,建议开启告警监控,以便随时掌握任务运行状态。

总结

针对您描述的问题,建议从以下几个方面入手: 1. 检查增量同步位点是否正确配置。 2. 调整 Flink 的 Checkpoint 和并发配置,确保状态管理正常。 3. 优化源端和目标端的性能配置,避免性能瓶颈。 4. 确保 DDL 操作在 Flink CDC 的支持范围内。 5. 通过日志和监控工具进一步排查问题。

如果以上方法仍无法解决问题,建议联系阿里云技术支持团队,提供详细的日志和配置信息以协助排查。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理