Flink CDC有没有遇到重复写入的问题,用flinksql读kafka往sqlserver数据库里写数据,程序也没有报错,但是就是一直重复写数据,相同的数据好几万条,关掉重启后又好了,不知道是什么原因导致的?很困惑 程序也没报错 也能同步新数据 但是已经写入的数据重复写入
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您的描述,Flink CDC在将Kafka中的数据写入SQL Server数据库时,出现了重复写入的问题。尽管程序没有报错,并且能够正常同步新数据,但已经写入的数据被重复写入了数万条。这种现象可能与以下几个方面有关:
Flink CDC依赖Kafka作为数据源,而Kafka在某些异常情况下(如网络抖动、Broker故障等)可能会导致重复消费。具体原因和解决方法如下:
原因:
at-least-once
语义可能导致重复消费。解决方案:
启用去重机制: 在Flink SQL中,可以通过定义PRIMARY KEY
并启用CDC事件去重功能来避免重复消费。例如:
CREATE TABLE kafka_source (
id BIGINT PRIMARY KEY NOT NULL,
name STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'your_topic',
'properties.bootstrap.servers' = 'your_kafka_broker',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
-- 启用CDC事件去重
SET table.exec.source.cdc-events-duplicate = true;
这样可以确保Flink在消费Kafka数据时对重复事件进行去重。
检查Kafka分区的Watermark生成: 如果某个Kafka分区长时间没有数据,可能会导致Watermark无法正常推进,从而影响基于Event Time的窗口计算。建议开启源数据空闲监测功能:
SET table.exec.source.idle-timeout = 5; -- 单位为秒
Flink作业在Checkpoint或Failover过程中可能出现状态不一致,导致重复写入。具体原因和解决方法如下:
原因:
解决方案:
切换到Batch Tunnel模式: 如果目标存储支持Batch Tunnel模式,建议切换到该模式以减少重复写入的风险。Batch Tunnel模式在Checkpoint失败时不会提交数据,从而避免重复写入。
升级Flink版本: 如果您使用的是较旧的Flink版本(如vvr-6.0.7-flink-1.15之前),建议升级到更高版本。新版本修复了MaxCompute结果表在关闭时提交数据的问题。
如果目标表未设置主键约束或唯一性约束,可能导致重复数据写入。具体原因和解决方法如下:
原因:
解决方案:
为目标表添加主键约束: 确保目标表定义了主键或唯一索引。例如:
ALTER TABLE your_table ADD CONSTRAINT pk_id PRIMARY KEY (id);
在Flink SQL中实现幂等写入: 如果无法修改目标表结构,可以在Flink SQL中通过ROW_NUMBER()
函数实现去重逻辑。例如:
INSERT INTO sqlserver_sink
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS rownum
FROM kafka_source
) WHERE rownum = 1;
这样可以确保每条记录只写入一次。
检查Flink日志:
env.java.opts: "-XX:-OmitStackTraceInFastThrow"
监控Kafka消费延迟:
CurrentEmitEventTimeLag
和CurrentFetchEventTimeLag
指标,确认是否存在延迟或异常。验证SQL Server连接器配置:
sink.buffer-flush.max-rows
和sink.buffer-flush.interval
参数,避免因缓冲区刷新策略导致重复写入。通过以上分析,您可以从以下几个方面入手解决问题: 1. 启用CDC事件去重,确保Flink消费Kafka数据时不会重复处理。 2. 优化Flink作业的状态管理,避免Checkpoint或Failover导致的重复写入。 3. 为目标表添加主键约束,确保SQL Server能够检测并拒绝重复数据。
如果问题仍未解决,建议进一步分析Flink日志和Kafka消费行为,定位具体的异常点。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。