state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//作业失败后不重启
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);
lastUserLogin.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getListState(lastUserLogin);
}
我重启了task managers 后。发现 counts 里面的数据都丢失了
*来自志愿者整理的flink邮件归档
1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。