开发者社区> 问答> 正文

state无法从checkpoint中恢复

state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//作业失败后不重启

env.setRestartStrategy(RestartStrategies.noRestart());

env.getCheckpointConfig().setCheckpointTimeout(500);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;

@Override

public void open(Configuration parameters) throws Exception {

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.minutes(30))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);

lastUserLogin.enableTimeToLive(ttlConfig);

counts = getRuntimeContext().getListState(lastUserLogin);

}

我重启了task managers 后。发现 counts 里面的数据都丢失了

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 16:06:28 911 0
1 条回答
写回答
取消 提交回答
  • 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象

    2 能否把你关于 counts 的其他代码也贴一下

    1. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看

    2. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

    [1]

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档

    2021-12-07 10:04:17
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
俞航翔|基于Log的通用增量Checkpoint 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载