Flink CDC我本地直接main方法启动 往es塞数据 es的文档id是自动生成的 本地跑没问题 但是放到flink环境中 采集的时候id会重复?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您描述的问题,Flink CDC在本地运行时往Elasticsearch(ES)插入数据时文档ID是自动生成的,且没有问题。但在Flink环境中运行时,采集的数据出现了文档ID重复的情况。以下是针对该问题的详细分析和解决方案:
在Elasticsearch中,文档ID的生成方式取决于是否定义了主键(PRIMARY KEY)。如果未定义主键,Elasticsearch会自动生成随机的文档ID;如果定义了主键,则主键字段的值会被用作文档ID。
本地环境正常的原因:
在本地运行时,可能由于数据量较小或任务运行时间较短,未触发重复数据的写入问题。
Flink环境中出现问题的原因:
Flink CDC在分布式环境下运行时,可能会因为以下原因导致文档ID重复:
at-least-once
语义,可能导致重复的变更事件被投递到Kafka中,进而被Flink消费。为了避免文档ID重复,建议在Elasticsearch结果表中定义主键,并确保主键字段具有唯一性。具体操作如下:
修改DDL语句:
在创建Elasticsearch结果表时,添加PRIMARY KEY
定义。例如:
CREATE TABLE es_sink (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '<yourHosts>',
'index' = '<yourIndex>'
);
PRIMARY KEY (user_id)
:指定user_id
作为主键,其值将作为Elasticsearch文档的ID。NOT ENFORCED
:表示不强制校验主键约束,适用于流式场景。确保主键字段唯一性:
确保user_id
字段在源数据中具有唯一性。如果源数据中没有唯一字段,可以通过组合字段生成唯一键。
如果无法修改Elasticsearch结果表的主键定义,可以通过Flink的去重机制解决重复事件问题:
设置作业参数:
在Flink作业中,设置以下参数以启用去重功能:
table.exec.source.cdc-events-duplicate=true
定义主键:
在Flink源表中定义主键,以便去重算子能够正确识别重复事件。例如:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);
如果问题仍然存在,可能是CDC工具(如Canal、Debezium、Maxwell等)在投递事件时出现了重复。请检查以下配置:
确保CDC工具的投递语义:
exactly-once
语义。at-least-once
语义,导致重复事件。调整Flink消费策略:
exactly-once
语义。例如,设置以下参数:
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.interval=60000
主键定义的重要性:
强烈建议在Elasticsearch结果表中定义主键,以避免文档ID重复问题。
CDC工具的语义限制:
在分布式环境中,CDC工具的at-least-once
语义可能导致重复事件,需结合Flink的去重机制处理。
测试与验证:
在生产环境中部署前,请务必在测试环境中验证上述配置的有效性。
通过以上方案,您可以有效解决Flink CDC在分布式环境中向Elasticsearch写入数据时文档ID重复的问题。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。