Flink CDC里这个问题有人知道咋回事吗?

Flink CDC里这个问题有人知道咋回事吗,就是source 发送的条数总是下游算子两倍,是有重复的读取吗还是怎么回事?0d8da5c713e9995eeac813985b8107e3.png

展开
收起
小小鹿鹿鹿 2024-01-24 12:21:06 51 分享 版权
阿里云 AI 助理回答

在Flink CDC中,如果发现source发送的条数是下游算子的两倍,可能是由于以下原因导致的。以下是详细分析和解决方案:


1. 重复数据投递问题

在非正常情况下(例如故障发生时),CDC工具(如Canal、Debezium、Maxwell等)只能保证at-least-once的投递语义,这可能导致重复的变更事件被投递到Kafka或其他消息队列中。当Flink从这些消息队列中消费数据时,可能会接收到重复的事件。

解决方案:

  • 启用去重机制:建议将作业参数table.exec.source.cdc-events-duplicate设置为true,并在源表上定义PRIMARY KEY。Flink会生成一个额外的有状态算子,使用PRIMARY KEY对变更事件进行去重,并生成一个规范化的changelog流。
  • SQL示例
    CREATE TABLE KafkaTable (
      origin_database STRING METADATA FROM 'value.database' VIRTUAL,
      origin_table STRING METADATA FROM 'value.table' VIRTUAL,
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'canal-json'
    );
    

2. MiniBatch机制的影响

Flink的MiniBatch机制可能会导致下游算子接收到重复的数据。具体来说,当table.exec.agg.mini-batch.output-identical-enabled参数设置为false(默认值)时,MinibatchGlobalAgg节点和MinibatchAgg节点在聚合结果未发生变化的情况下不会向下游发送数据。但如果设置为true,即使聚合结果未变化,也会向下游发送重复数据。

检查与调整:

  • 确认是否启用了MiniBatch机制(table.exec.mini-batch.enabled)。
  • 检查table.exec.agg.mini-batch.output-identical-enabled参数的值。如果不需要重复数据下发,可以将其设置为false
  • 配置示例
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency: 2s
    table.exec.agg.mini-batch.output-identical-enabled: false
    

3. Source复用导致的重复读取

如果同一个作业中有多个MySQL CDC源表,并且启用了Source复用功能(table.optimizer.source-merge.enabled),可能会导致合并后的Source向下游发送重复数据。

检查与解决:

  • 确认是否启用了Source复用功能:
    SET 'table.optimizer.source-merge.enabled' = 'true';
    
  • 如果启用了Source复用,确保作业无状态启动,因为Source复用会导致作业拓扑改变,可能引发重复数据问题。
  • 重要提醒:在VVR 8.0.8及8.0.9版本中,开启Source复用时还需要额外设置:
    SET 'sql-gateway.exec-plan.enabled' = 'false';
    

4. Binlog解析问题

在增量阶段,Flink CDC通过解析MySQL的Binlog文件生成变更消息。由于Binlog是实例级别的,它会记录所有数据库和表的变更。即使Flink作业只涉及部分表,Binlog仍会包含其他表的变更记录。这种过滤过程是在连接器层面完成的,而不是在MySQL层面。

解决方案:

  • 确保Flink CDC连接器正确配置了需要读取的表名(table-name)。
  • 如果带宽消耗较大,可以通过Source复用减少Binlog读取的压力:
    SET 'table.optimizer.source-merge.enabled' = 'true';
    

5. 其他可能的原因

  • 并行度设置不当:如果Source的并行度高于实际需求,可能会导致重复数据的产生。检查并调整Source的并行度。
  • Checkpoint机制:如果Checkpoint失败或频繁触发,可能会导致重复数据的产生。确保Checkpoint配置合理。

总结与建议

根据上述分析,您可以按照以下步骤排查和解决问题: 1. 检查是否启用了CDC去重机制(table.exec.source.cdc-events-duplicate)。 2. 确认MiniBatch机制的配置是否合理(table.exec.agg.mini-batch.output-identical-enabled)。 3. 检查是否启用了Source复用功能,并确保无状态启动。 4. 确保Flink CDC连接器正确配置了需要读取的表名。 5. 调整Source并行度和Checkpoint配置。

如果问题仍未解决,建议提供更详细的作业配置和日志信息,以便进一步分析。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理