开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink1.15.4 cdc从checkpoint点恢复无效,还是全量数据读取和写入

Job任务功能是将mongo中数据同步到upsert-kafka中,数据全部读取到kafka后,再手动停止任务,然后从最新一个checkpoint恢复任务后观察到 还是全量读取数据,并且kafka 队列中的数据也会再次全量增加,请问是什么原因?状态后端使用的rocksdb
job任务参数:
SET 'execution.checkpointing.interval' = '10s';
SET 'table.exec.state.ttl' = '24h';
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';

flink关键日志如下:
2024-05-08 12:42:09,382 INFO org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation [] - Finished restoring from state handle: IncrementalRemoteKeyedStateHandle{backendIdentifier=4674d965-fa7b-4792-bb1d-d756be2143bc, stateHandleId=0736bcc7-1550-4cbb-9e6c-980b6e0060a8, keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=207, sharedState={000039.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/3e084650-ecb7-43e8-9a33-604e79cc42d4 [136417 bytes], 000040.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/68383a53-7470-4a76-92cd-c39ba01813df [22812 bytes], 000037.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/ae6c0d52-1bb6-4dd8-859c-2db750137e18 [2160875 bytes], 000038.sst=File State: file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/0f1b5b7c-ce5a-4cf2-976d-3ed0a6ed48a4 [175706 bytes]}, privateState={MANIFEST-000004=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/6336cf2e-1c1d-4279-936a-3801610d9f3d', dataBytes=4119}, OPTIONS-000013=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/33772295-2caa-4ed3-a11b-0e450ad18662', dataBytes=17399}, CURRENT=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/shared/5d6f5923-8d73-410c-9903-abe9e385a7fb', dataBytes=16}}, metaStateHandle=ByteStreamStateHandle{handleName='file:/opt/bigdata/flink-1.15.4/data/flink-checkpoints/09e8b906d8cafbcadd0320ca779c62b7/chk-207/23800fbd-d37c-4281-a49a-0ae5a3ba1d4a', dataBytes=16176}, registered=false} without rescaling.

展开
收起
hlg4kunfbuxuq 2024-05-08 13:25:45 44 0
1 条回答
写回答
取消 提交回答
  • 如果flink sql中增加了配置:SET 'table.exec.state.ttl' = '24h';
    则从指定checkpoint或savepoint恢复任务都会再次全量同步;如果没有ttl配置,则从ck或savepoint恢复后执行正常

    2024-05-08 16:59:45
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载