Flink CDC里使用增量快照'scan.incremental.snapshot.enabled'='true,启动模式'scan.startup.mode'='initial',checkpoint已经写S3
假设jobmanager崩溃重启,如何读取最近一次的checkpoint?现在是出现sink重复数据的问题呢?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink CDC时,启用了增量快照(scan.incremental.snapshot.enabled='true'
)和启动模式为initial
(scan.startup.mode='initial'
),并且checkpoint已经写入S3。当JobManager崩溃重启后,如何读取最近一次的checkpoint?同时,用户提到出现了sink重复数据的问题。
以下是针对该问题的详细解答:
Flink支持从最近一次的checkpoint恢复作业,具体操作步骤如下:
execution.checkpointing.interval: 60000 # 检查点间隔时间(毫秒)
state.backend: rocksdb # 使用RocksDB作为状态后端
state.checkpoints.dir: s3://your-bucket/checkpoints # checkpoint存储路径
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION # 保留checkpoint
确保这些参数已正确设置,并且checkpoint已成功写入S3。
-s
参数指向最近一次的checkpoint路径。例如:./bin/flink run -s s3://your-bucket/checkpoints/<job-id>/chk-<checkpoint-id> -d your-job.jar
sink
未实现幂等性或未启用Exactly-Once语义,则可能导致重复写入。根据问题描述,以下是几种解决sink重复数据问题的方法:
EXACTLY_ONCE
:execution.checkpointing.mode: EXACTLY_ONCE
INSERT ... ON CONFLICT
语句实现幂等写入。execution.checkpointing.interval
从60秒调整为30秒。scan.startup.mode
配置正确。如果设置为initial
,Flink会在首次启动时扫描全量数据,然后切换到增量数据。如果作业频繁重启,可能会导致全量数据被重复消费。
scan.startup.mode
改为latest-offset
,以避免重复消费全量数据。scan.incremental.snapshot.enabled=true
),请确保server-id
范围足够大以支持并发数。通过上述方法,您可以从最近一次的checkpoint恢复作业,并解决sink重复数据的问题。建议优先启用Exactly-Once语义或实现幂等性,以确保数据一致性。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。