Hi all 小弟遇到个问题期望大佬解答解答: 通过 env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?
public class FlinkTestDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints")); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));
Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "6000"); configuration.setString("table.exec.mini-batch.size", "5000"); *来自志愿者整理的flink邮件归档
你配置的是本地目录,不是 hdfs目录,当重启后,可能新的任务运行的机器不是之前的那台机器了,那么之前作业的状态信息(在其他机器上)是不在新的机器上的,那么就会发现找不到状态文件,建议配置成HDFS 的。 *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。