FlinkCDC 3.0版本, mysql是主键模型是什么问题?

FlinkCDC 3.0版本, mysql是主键模型, 目标表是doris 联合主键的表, cdc启动以后会重复消费mysql, 是什么问题?image.png
这是代码,CREATE TABLE ads_uat_epbplat_qrtz_scheduler_state (
SCHED_NAME VARCHAR(600),
INSTANCE_NAME VARCHAR(1000),
LAST_CHECKIN_TIME BIGINT,
CHECKIN_INTERVAL BIGINT
) ENGINE=OLAP
UNIQUE KEY (SCHED_NAME, INSTANCE_NAME)
DISTRIBUTED BY HASH(SCHED_NAME, INSTANCE_NAME) BUCKETS AUTO 这个是目标表的建表语句

展开
收起
真的很搞笑 2024-05-15 13:13:33 88 分享 版权
阿里云 AI 助理回答

问题分析

根据用户描述,Flink CDC 3.0版本在将MySQL主键模型的数据同步到Doris联合主键表时,出现了重复消费的问题。以下是可能的原因及解决方案。


可能原因及解决方案

1. 主键定义不一致

Flink CDC在读取MySQL数据时,会基于MySQL表的主键生成变更记录(INSERT、UPDATE、DELETE)。如果目标表(Doris)的主键定义与源表(MySQL)不一致,可能会导致数据重复写入。

  • 问题表现

    • MySQL表的主键是单一字段,而Doris表使用了联合主键。
    • Flink CDC在写入Doris时,无法正确识别主键,导致重复插入。
  • 解决方案: 确保Flink DDL中定义的主键与MySQL物理表的主键完全一致。如果MySQL表没有联合主键,则需要调整Doris表的主键定义,使其与MySQL表保持一致。

    示例调整

    CREATE TABLE ads_uat_epbplat_qrtz_scheduler_state (
      SCHED_NAME VARCHAR(600),
      INSTANCE_NAME VARCHAR(1000),
      LAST_CHECKIN_TIME BIGINT,
      CHECKIN_INTERVAL BIGINT,
      PRIMARY KEY (SCHED_NAME) -- 调整为单一主键
    ) ENGINE=OLAP
    UNIQUE KEY (SCHED_NAME)
    DISTRIBUTED BY HASH(SCHED_NAME) BUCKETS AUTO;
    

    重要提醒:Flink DDL中定义的主键必须与MySQL物理表的主键一致,否则会影响数据正确性。


2. Flink CDC的Exactly Once语义未生效

Flink CDC通过Checkpoint机制实现Exactly Once语义。如果Checkpoint配置不当或未启用,可能会导致数据重复消费。

  • 问题表现

    • Flink作业重启后,CDC源表从Binlog的某个位置重新消费,导致重复写入目标表。
    • 目标表(Doris)未正确处理重复数据。
  • 解决方案

    1. 启用Checkpoint: 确保Flink作业启用了Checkpoint,并设置了合理的间隔时间。

      execution.checkpointing.interval: 60000 # 每60秒触发一次Checkpoint
      execution.checkpointing.mode: EXACTLY_ONCE
      
    2. 检查Binlog消费策略: 在Flink作业重启时,选择从最新状态恢复(而非全新启动),以避免重复消费。

      scan.startup.mode: latest-offset
      
    3. 目标表去重逻辑: 如果目标表(Doris)支持UPSERT操作,可以通过ON DUPLICATE KEY UPDATE语句避免重复插入。

      INSERT INTO ads_uat_epbplat_qrtz_scheduler_state (SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME, CHECKIN_INTERVAL)
      VALUES ('scheduler1', 'instance1', 123456789, 60000)
      ON DUPLICATE KEY UPDATE
       LAST_CHECKIN_TIME = VALUES(LAST_CHECKIN_TIME),
       CHECKIN_INTERVAL = VALUES(CHECKIN_INTERVAL);
      

3. Doris表的分布键设置不合理

Doris表的分布键(DISTRIBUTED BY HASH)会影响数据的存储和查询性能。如果分布键设置不合理,可能会导致数据倾斜或重复写入。

  • 问题表现

    • 数据在Doris表中分布不均,导致部分分片出现重复数据。
    • 查询性能下降。
  • 解决方案: 调整Doris表的分布键,确保数据均匀分布。例如,可以使用联合主键的所有字段作为分布键。

    DISTRIBUTED BY HASH(SCHED_NAME, INSTANCE_NAME) BUCKETS AUTO;
    

4. Flink CDC Source复用未开启

如果同一个Flink作业中有多个MySQL CDC源表,且未开启Source复用,可能会导致多个Binlog Client同时消费同一份数据,从而引发重复消费。

  • 问题表现

    • 多个CDC源表读取同一MySQL实例的数据。
    • 数据重复写入目标表。
  • 解决方案: 开启Flink CDC Source复用功能,减少对MySQL的压力并避免重复消费。

    SET 'table.optimizer.source-merge.enabled' = 'true';
    

    注意:在VVR 8.0.8及8.0.9版本中,还需要额外设置以下参数:

    SET 'sql-gateway.exec-plan.enabled' = 'false';
    

总结建议

  1. 主键一致性:确保Flink DDL中定义的主键与MySQL物理表的主键一致。
  2. Exactly Once语义:启用Checkpoint并合理配置Binlog消费策略。
  3. 目标表去重:在Doris表中使用ON DUPLICATE KEY UPDATE语句避免重复插入。
  4. 分布键优化:调整Doris表的分布键,确保数据均匀分布。
  5. Source复用:开启Flink CDC Source复用功能,减少重复消费的可能性。

通过以上调整,可以有效解决Flink CDC在同步MySQL数据到Doris时的重复消费问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理