flink cdc stream 方式,oracle到oracle,是某个设置有问题吗?

flink cdc stream 方式,oracle到oracle,会出现重复插入数据,是某个设置有问题吗?image.png

展开
收起
真的很搞笑 2023-12-24 19:10:24 76 分享 版权
阿里云 AI 助理回答

在使用 Flink CDC Stream 方式从 Oracle 同步到 Oracle 时,出现重复插入数据的问题,可能与以下几个方面的原因和配置有关。以下是详细分析及解决方案:


1. CDC 数据投递语义问题

Flink CDC 在正常操作环境下可以保证 exactly-once 的语义,但在非正常情况下(如故障或 failover),CDC 工具(如 Debezium、Canal 或 Maxwell)通常只能保证 at-least-once 的语义。这可能导致重复的变更事件被投递到下游。

解决方案:

  • 启用去重机制
    在 Flink SQL 中,可以通过设置参数 table.exec.source.cdc-events-duplicatetrue 来启用去重功能,并确保源表定义了主键(PRIMARY KEY)。Flink 系统会生成一个额外的有状态算子(ChangelogNormalize),利用主键对变更事件进行去重并生成规范化的 changelog 流。

    示例配置:

    SET 'table.exec.source.cdc-events-duplicate' = 'true';
    
  • 检查主键定义
    确保源表和目标表都正确声明了主键。如果主键未定义或不唯一,可能会导致重复数据写入。


2. Checkpoint 配置问题

Flink 的 Checkpoint 是保证数据一致性和容错能力的核心机制。如果 Checkpoint 配置不当,可能会导致数据重复。

可能原因:

  • Checkpoint 失败或超时
    如果 Checkpoint 过程中失败或超时,可能会导致部分数据被重复提交。例如,在使用 Batch Tunnel 写入 MaxCompute 时,Checkpoint 失败可能导致数据重复。

  • Failover 恢复时的数据重复
    在 Stream Tunnel 模式下,如果作业发生 failover 并从最新 Checkpoint 恢复,可能会导致 Checkpoint 完成后到 failover 前的数据重复写入。

解决方案:

  • 优化 Checkpoint 配置

    • 调整 Checkpoint 的间隔时间(execution.checkpointing.interval),避免过于频繁的 Checkpoint 导致性能瓶颈。
    • 确保 Checkpoint 存储路径(如 HDFS 或 OSS)具有足够的存储空间和高可用性。
    • 启用增量 Checkpoint(state.backend.incremental),以减少每次 Checkpoint 的开销。

    示例配置:

    execution.checkpointing.interval: 60000 # 每 60 秒触发一次 Checkpoint
    state.backend.incremental: true        # 启用增量 Checkpoint
    
  • 切换到 Batch Tunnel 模式
    如果重复数据问题无法通过上述方法解决,可以考虑切换到 Batch Tunnel 模式,以避免 Stream Tunnel 模式下的数据重复问题。


3. Oracle 表的主键约束问题

在 Oracle 到 Oracle 的同步场景中,如果目标表未正确设置主键约束,可能会导致重复数据写入。

解决方案:

  • 检查目标表的主键约束
    确保目标表已定义主键,并且主键字段能够唯一标识每条记录。如果目标表是事务性表,建议启用事务支持以避免重复数据。

  • 手动去重逻辑
    如果目标表无法修改主键约束,可以在 Flink SQL 中添加去重逻辑。例如,使用 ROW_NUMBER() 函数对数据进行去重:

    SELECT *
    FROM (
      SELECT *, ROW_NUMBER() OVER (PARTITION BY primary_key ORDER BY event_time DESC) AS row_num
      FROM source_table
    ) WHERE row_num = 1;
    

4. CDC 工具的配置问题

如果使用的是 Debezium 或其他 CDC 工具,可能存在配置不当的情况,导致重复数据产生。

解决方案:

  • 检查 Debezium 配置
    确保 Debezium 的 snapshot.modelog.mining.strategy 配置正确。例如,snapshot.mode 设置为 initialschema_only,以避免全量同步时的重复数据。

  • 调整日志挖掘策略
    如果使用 Oracle LogMiner,确保日志挖掘策略(log.mining.strategy)设置为 online_catalogredo_log_catalog,以提高日志解析的准确性。


5. Flink 版本问题

某些 Flink 版本可能存在已知的 Bug,导致数据重复问题。

解决方案:

  • 升级 Flink 版本
    如果当前使用的 Flink 版本较低,建议升级到最新稳定版本(如 vvr-6.0.7-flink-1.15 或更高版本),以修复已知问题并提升性能。

总结

针对 Flink CDC Stream 方式从 Oracle 同步到 Oracle 时出现的重复插入数据问题,可以从以下方面进行排查和解决: 1. 启用 Flink 的去重机制(table.exec.source.cdc-events-duplicate)并定义主键。 2. 优化 Checkpoint 配置,避免因 Checkpoint 失败或 failover 导致的数据重复。 3. 检查目标表的主键约束,必要时添加手动去重逻辑。 4. 确保 CDC 工具(如 Debezium)的配置正确。 5. 升级 Flink 版本以修复潜在的 Bug。

通过以上步骤,可以有效解决重复插入数据的问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理