Job任务功能是将mongo中数据同步到upsert-kafka中,数据全部读取到kafka后,再手动停止任务,然后从最新一个checkpoint恢复任务后观察到 还是全量读取数据,并且kafka 队列中的数据也会再次全量增加,请问是什么原因?状态后端使用的rocksdb
job任务参数:
SET 'execution.checkpointing.interval' = '10s';
SET 'table.exec.state.ttl' = '24h';
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
flink关键日志如下:
2024-05-08 12:42:09,382 INFO org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation [] - Finished restoring from state handle: IncrementalRemoteKeyedStateHandle{backendIdentifier=4674d965-fa7b-4792-bb1d-d756be2143bc, stateHandleId=0736bcc7-1550-4cbb-9e6c-980b6e0060a8, keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=207, sharedState={000039.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/3e084650-ecb7-43e8-9a33-604e79cc42d4 [136417 bytes], 000040.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/68383a53-7470-4a76-92cd-c39ba01813df [22812 bytes], 000037.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/ae6c0d52-1bb6-4dd8-859c-2db750137e18 [2160875 bytes], 000038.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/0f1b5b7c-ce5a-4cf2-976d-3ed0a6ed48a4 [175706 bytes]}, privateState={MANIFEST-000004=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/6336cf2e-1c1d-4279-936a-3801610d9f3d', dataBytes=4119}, OPTIONS-000013=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/33772295-2caa-4ed3-a11b-0e450ad18662', dataBytes=17399}, CURRENT=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/5d6f5923-8d73-410c-9903-abe9e385a7fb', dataBytes=16}}, metaStateHandle=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/chk-207/23800fbd-d37c-4281-a49a-0ae5a3ba1d4a', dataBytes=16176}, registered=false} without rescaling.
如果flink sql中增加了配置:SET 'table.exec.state.ttl' = '24h';
则从指定checkpoint或savepoint恢复任务都会再次全量同步;如果没有ttl配置,则从ck或savepoint恢复后执行正常
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。