Flink CDC里这个问题有人知道咋回事吗,就是source 发送的条数总是下游算子两倍,是有重复的读取吗还是怎么回事?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中,如果发现source发送的条数是下游算子的两倍,可能是由于以下原因导致的。以下是详细分析和解决方案:
在非正常情况下(例如故障发生时),CDC工具(如Canal、Debezium、Maxwell等)只能保证at-least-once的投递语义,这可能导致重复的变更事件被投递到Kafka或其他消息队列中。当Flink从这些消息队列中消费数据时,可能会接收到重复的事件。
table.exec.source.cdc-events-duplicate
设置为true
,并在源表上定义PRIMARY KEY
。Flink会生成一个额外的有状态算子,使用PRIMARY KEY
对变更事件进行去重,并生成一个规范化的changelog流。CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);
Flink的MiniBatch机制可能会导致下游算子接收到重复的数据。具体来说,当table.exec.agg.mini-batch.output-identical-enabled
参数设置为false
(默认值)时,MinibatchGlobalAgg节点和MinibatchAgg节点在聚合结果未发生变化的情况下不会向下游发送数据。但如果设置为true
,即使聚合结果未变化,也会向下游发送重复数据。
table.exec.mini-batch.enabled
)。table.exec.agg.mini-batch.output-identical-enabled
参数的值。如果不需要重复数据下发,可以将其设置为false
。table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.agg.mini-batch.output-identical-enabled: false
如果同一个作业中有多个MySQL CDC源表,并且启用了Source复用功能(table.optimizer.source-merge.enabled
),可能会导致合并后的Source向下游发送重复数据。
SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'sql-gateway.exec-plan.enabled' = 'false';
在增量阶段,Flink CDC通过解析MySQL的Binlog文件生成变更消息。由于Binlog是实例级别的,它会记录所有数据库和表的变更。即使Flink作业只涉及部分表,Binlog仍会包含其他表的变更记录。这种过滤过程是在连接器层面完成的,而不是在MySQL层面。
table-name
)。SET 'table.optimizer.source-merge.enabled' = 'true';
根据上述分析,您可以按照以下步骤排查和解决问题: 1. 检查是否启用了CDC去重机制(table.exec.source.cdc-events-duplicate
)。 2. 确认MiniBatch机制的配置是否合理(table.exec.agg.mini-batch.output-identical-enabled
)。 3. 检查是否启用了Source复用功能,并确保无状态启动。 4. 确保Flink CDC连接器正确配置了需要读取的表名。 5. 调整Source并行度和Checkpoint配置。
如果问题仍未解决,建议提供更详细的作业配置和日志信息,以便进一步分析。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。