flink cdc stream 方式,oracle到oracle,会出现重复插入数据,是某个设置有问题吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用 Flink CDC Stream 方式从 Oracle 同步到 Oracle 时,出现重复插入数据的问题,可能与以下几个方面的原因和配置有关。以下是详细分析及解决方案:
Flink CDC 在正常操作环境下可以保证 exactly-once 的语义,但在非正常情况下(如故障或 failover),CDC 工具(如 Debezium、Canal 或 Maxwell)通常只能保证 at-least-once 的语义。这可能导致重复的变更事件被投递到下游。
启用去重机制:
在 Flink SQL 中,可以通过设置参数 table.exec.source.cdc-events-duplicate
为 true
来启用去重功能,并确保源表定义了主键(PRIMARY KEY)。Flink 系统会生成一个额外的有状态算子(ChangelogNormalize),利用主键对变更事件进行去重并生成规范化的 changelog 流。
示例配置:
SET 'table.exec.source.cdc-events-duplicate' = 'true';
检查主键定义:
确保源表和目标表都正确声明了主键。如果主键未定义或不唯一,可能会导致重复数据写入。
Flink 的 Checkpoint 是保证数据一致性和容错能力的核心机制。如果 Checkpoint 配置不当,可能会导致数据重复。
Checkpoint 失败或超时:
如果 Checkpoint 过程中失败或超时,可能会导致部分数据被重复提交。例如,在使用 Batch Tunnel 写入 MaxCompute 时,Checkpoint 失败可能导致数据重复。
Failover 恢复时的数据重复:
在 Stream Tunnel 模式下,如果作业发生 failover 并从最新 Checkpoint 恢复,可能会导致 Checkpoint 完成后到 failover 前的数据重复写入。
优化 Checkpoint 配置:
execution.checkpointing.interval
),避免过于频繁的 Checkpoint 导致性能瓶颈。state.backend.incremental
),以减少每次 Checkpoint 的开销。示例配置:
execution.checkpointing.interval: 60000 # 每 60 秒触发一次 Checkpoint
state.backend.incremental: true # 启用增量 Checkpoint
切换到 Batch Tunnel 模式:
如果重复数据问题无法通过上述方法解决,可以考虑切换到 Batch Tunnel 模式,以避免 Stream Tunnel 模式下的数据重复问题。
在 Oracle 到 Oracle 的同步场景中,如果目标表未正确设置主键约束,可能会导致重复数据写入。
检查目标表的主键约束:
确保目标表已定义主键,并且主键字段能够唯一标识每条记录。如果目标表是事务性表,建议启用事务支持以避免重复数据。
手动去重逻辑:
如果目标表无法修改主键约束,可以在 Flink SQL 中添加去重逻辑。例如,使用 ROW_NUMBER()
函数对数据进行去重:
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY primary_key ORDER BY event_time DESC) AS row_num
FROM source_table
) WHERE row_num = 1;
如果使用的是 Debezium 或其他 CDC 工具,可能存在配置不当的情况,导致重复数据产生。
检查 Debezium 配置:
确保 Debezium 的 snapshot.mode
和 log.mining.strategy
配置正确。例如,snapshot.mode
设置为 initial
或 schema_only
,以避免全量同步时的重复数据。
调整日志挖掘策略:
如果使用 Oracle LogMiner,确保日志挖掘策略(log.mining.strategy
)设置为 online_catalog
或 redo_log_catalog
,以提高日志解析的准确性。
某些 Flink 版本可能存在已知的 Bug,导致数据重复问题。
针对 Flink CDC Stream 方式从 Oracle 同步到 Oracle 时出现的重复插入数据问题,可以从以下方面进行排查和解决: 1. 启用 Flink 的去重机制(table.exec.source.cdc-events-duplicate
)并定义主键。 2. 优化 Checkpoint 配置,避免因 Checkpoint 失败或 failover 导致的数据重复。 3. 检查目标表的主键约束,必要时添加手动去重逻辑。 4. 确保 CDC 工具(如 Debezium)的配置正确。 5. 升级 Flink 版本以修复潜在的 Bug。
通过以上步骤,可以有效解决重复插入数据的问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。