flink cdc 采集oracle 间歇性丢数据,日志里也没报错的情况,怎么回事?

flink cdc 采集oracle 间歇性丢数据,日志里也没报错的情况,使用 3.0 的 pipline 从 MySQL 向 Doris 同步数据的问题

flink 上跑的任务中断过,所以改用了 timestamp 的模式,然后过程中间,对表结构有过变化,数据复制链路就中断了;

报错提示:Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema.
Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to TIMESTAMP

然后尝试了其他的集中启动模式,都相同。

那是不是只能采用默认的 initial 从头进行数据同步和表结构映射呢?

展开
收起
小小鹿鹿鹿 2024-02-06 21:29:17 169 分享 版权
阿里云 AI 助理回答

问题分析与解决方案

根据您描述的问题,Flink CDC 在采集 Oracle 数据时出现间歇性丢数据的情况,同时在使用 Flink 3.0 的 Pipeline 从 MySQL 向 Doris 同步数据时,由于表结构变更导致数据复制链路中断,并报错 SchemaOutOfSyncException。以下是针对问题的详细分析和解决方案。


1. Flink CDC 采集 Oracle 间歇性丢数据

可能原因

  • 网络波动或连接超时:Oracle 数据库与 Flink 集群之间的网络不稳定可能导致数据丢失。
  • CDC 源端配置不当:例如,未正确设置 fetchSizepollInterval 参数,导致数据捕获不完整。
  • 日志未记录异常:某些情况下,Flink CDC 的日志级别可能未设置为 DEBUGTRACE,导致潜在问题未被记录。

解决方案

  1. 检查网络连通性

    • 确保 Oracle 数据库与 Flink 集群之间的网络稳定,避免因网络抖动导致数据丢失。
    • 如果跨 VPC 访问,建议通过 NAT 网关或专线解决网络连通性问题。
  2. 优化 CDC 源端配置

    • 调整 fetchSizepollInterval 参数,确保数据捕获的效率和稳定性。例如:
      source:
      type: oracle-cdc
      fetchSize: 1000
      pollInterval: 500ms
      
    • 增加 TaskManager 的 CPU 和内存资源,避免因资源不足导致数据处理延迟。
  3. 提升日志级别

    • 将 Flink CDC 的日志级别调整为 DEBUGTRACE,以便捕获更详细的运行信息。具体操作如下:
      • log4j.properties 文件中添加以下配置:
      logger.flink-cdc.name = com.ververica.cdc
      logger.flink-cdc.level = DEBUG
      

2. MySQL 向 Doris 同步数据时表结构变更导致链路中断

报错分析

报错信息 SchemaOutOfSyncException 表明 Flink CDC 的内部 Schema 表示与实际数据库 Schema 不一致。这通常发生在以下场景: - 启动模式为 TIMESTAMP:当表结构在指定的时间戳之后发生变更时,Flink CDC 无法动态同步这些变更。 - 启动模式为其他模式(如 LATEST_OFFSET):同样不支持动态表结构变更。

解决方案

  1. 采用 INITIAL 模式重新同步

    • 如果表结构变更频繁且无法避免,建议使用 initial 模式从头开始同步数据。此模式会重新读取全量数据并同步最新的表结构。
    • 示例配置:
      source:
      type: mysql-cdc
      startupMode: initial
      
  2. 手动处理表结构变更

    • 如果无法接受全量同步,可以手动处理表结构变更:
      1. 停止当前同步任务。
      2. 删除下游 Doris 表。
      3. 无状态重启同步任务(即清除 Checkpoint 或 Savepoint)。
      4. 确保上下游表结构一致后重新启动任务。
  3. 启用动态 Schema 支持

    • 如果使用的是 Flink CDC 3.0,可以通过配置动态 Schema 支持来减少表结构变更的影响。例如:
      source:
      type: mysql-cdc
      enableDynamicSchema: true
      
    • 注意:动态 Schema 支持需要下游存储(如 Doris)能够兼容表结构变更。

3. 关于启动模式的选择

各启动模式的特点

  • INITIAL:从头开始同步全量数据,适用于表结构变更频繁的场景。
  • TIMESTAMP:从指定时间戳开始同步增量数据,但不支持表结构变更。
  • LATEST_OFFSET:从最新偏移量开始同步增量数据,同样不支持表结构变更。

推荐选择

  • 如果表结构变更不可避免,建议使用 initial 模式进行全量同步。
  • 如果表结构变更较少且可以手动干预,可以选择 timestamplatest-offset 模式,并在变更后手动处理。

4. 其他注意事项

性能优化

  • 增加 TaskManager 资源:如果 Source 出现 finish split response timeout 异常,可能是 TaskManager 的 CPU 使用率过高导致。建议增加 TaskManager 的 CPU 和内存资源。
  • 调整写入参数:对于 Doris 结果表,可以调整 batchSizeflushIntervalMs 参数以提高写入性能。例如:
    sink:
    type: doris
    batchSize: 500
    flushIntervalMs: 2000
    

监控与告警

  • 启用日志归档:在实时计算开发控制台中开启日志归档功能,便于后续排查问题。
  • 配置告警规则:通过云监控设置钉钉、电话等多种方式的告警,及时发现并处理异常。

总结

针对您的问题,建议采取以下措施: 1. 对于 Oracle 数据采集丢数据问题,优化网络连通性和 CDC 源端配置,并提升日志级别以捕获更多细节。 2. 对于 MySQL 向 Doris 同步数据时表结构变更导致的链路中断问题,优先考虑使用 initial 模式重新同步数据,或手动处理表结构变更。 3. 根据实际需求选择合适的启动模式,并注意性能优化和监控告警配置。

希望以上解答能够帮助您解决问题!如有进一步疑问,请随时联系技术支持团队。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理