FlinkCDC 3.0版本, mysql是主键模型, 目标表是doris 联合主键的表, cdc启动以后会重复消费mysql, 是什么问题?
这是代码,CREATE TABLE ads_uat_epbplat_qrtz_scheduler_state (
SCHED_NAME
VARCHAR(600),
INSTANCE_NAME
VARCHAR(1000),
LAST_CHECKIN_TIME
BIGINT,
CHECKIN_INTERVAL
BIGINT
) ENGINE=OLAP
UNIQUE KEY (SCHED_NAME
, INSTANCE_NAME
)
DISTRIBUTED BY HASH(SCHED_NAME
, INSTANCE_NAME
) BUCKETS AUTO 这个是目标表的建表语句
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据用户描述,Flink CDC 3.0版本在将MySQL主键模型的数据同步到Doris联合主键表时,出现了重复消费的问题。以下是可能的原因及解决方案。
Flink CDC在读取MySQL数据时,会基于MySQL表的主键生成变更记录(INSERT、UPDATE、DELETE)。如果目标表(Doris)的主键定义与源表(MySQL)不一致,可能会导致数据重复写入。
问题表现:
解决方案: 确保Flink DDL中定义的主键与MySQL物理表的主键完全一致。如果MySQL表没有联合主键,则需要调整Doris表的主键定义,使其与MySQL表保持一致。
示例调整:
CREATE TABLE ads_uat_epbplat_qrtz_scheduler_state (
SCHED_NAME VARCHAR(600),
INSTANCE_NAME VARCHAR(1000),
LAST_CHECKIN_TIME BIGINT,
CHECKIN_INTERVAL BIGINT,
PRIMARY KEY (SCHED_NAME) -- 调整为单一主键
) ENGINE=OLAP
UNIQUE KEY (SCHED_NAME)
DISTRIBUTED BY HASH(SCHED_NAME) BUCKETS AUTO;
重要提醒:Flink DDL中定义的主键必须与MySQL物理表的主键一致,否则会影响数据正确性。
Flink CDC通过Checkpoint机制实现Exactly Once语义。如果Checkpoint配置不当或未启用,可能会导致数据重复消费。
问题表现:
解决方案:
启用Checkpoint: 确保Flink作业启用了Checkpoint,并设置了合理的间隔时间。
execution.checkpointing.interval: 60000 # 每60秒触发一次Checkpoint
execution.checkpointing.mode: EXACTLY_ONCE
检查Binlog消费策略: 在Flink作业重启时,选择从最新状态恢复(而非全新启动),以避免重复消费。
scan.startup.mode: latest-offset
目标表去重逻辑: 如果目标表(Doris)支持UPSERT操作,可以通过ON DUPLICATE KEY UPDATE
语句避免重复插入。
INSERT INTO ads_uat_epbplat_qrtz_scheduler_state (SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME, CHECKIN_INTERVAL)
VALUES ('scheduler1', 'instance1', 123456789, 60000)
ON DUPLICATE KEY UPDATE
LAST_CHECKIN_TIME = VALUES(LAST_CHECKIN_TIME),
CHECKIN_INTERVAL = VALUES(CHECKIN_INTERVAL);
Doris表的分布键(DISTRIBUTED BY HASH
)会影响数据的存储和查询性能。如果分布键设置不合理,可能会导致数据倾斜或重复写入。
问题表现:
解决方案: 调整Doris表的分布键,确保数据均匀分布。例如,可以使用联合主键的所有字段作为分布键。
DISTRIBUTED BY HASH(SCHED_NAME, INSTANCE_NAME) BUCKETS AUTO;
如果同一个Flink作业中有多个MySQL CDC源表,且未开启Source复用,可能会导致多个Binlog Client同时消费同一份数据,从而引发重复消费。
问题表现:
解决方案: 开启Flink CDC Source复用功能,减少对MySQL的压力并避免重复消费。
SET 'table.optimizer.source-merge.enabled' = 'true';
注意:在VVR 8.0.8及8.0.9版本中,还需要额外设置以下参数:
SET 'sql-gateway.exec-plan.enabled' = 'false';
ON DUPLICATE KEY UPDATE
语句避免重复插入。通过以上调整,可以有效解决Flink CDC在同步MySQL数据到Doris时的重复消费问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。