开发者社区> 问答> 正文

如何手动读取Savepoints中的内容?

目前有个需求想要实现Flink SQL的保存点,但是由于在SQL中operator uid是随机生成的,一旦修改SQL会导致无法读取到Savepoints中的状态信息。

想到一种方式是正常执行Savepoint操作,然后启动的时候手动读取Savepoint中的内容,获取Kafka每个分区的消费offset,再替换到SQL代码中。

目前通过在源码中打印相关日志可以发现,保存点触发时,消费的记录正常读取到9,但是手动读取的时候,却找不到这个而信息。

下面是我手动读取的代码,以下内容是参考Savepoint单元测试中的读取案例:

String savepointPath = "hdfs://namenode:8020/flink/savepoints/test040603/savepoint-a2cfcd-2ee1c4afcf9f"; CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorageAccess .resolveCheckpointPointer(savepointPath);

try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) { CheckpointMetadata metadata = Checkpoints .loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader(), savepointPath); System.out.println(metadata); } catch (IOException e) { e.printStackTrace(); }

http://apache-flink.147419.n8.nabble.com/file/t1275/%E5%BE%AE%E4%BF%A1%E6%88%AA%E5%9B%BE_20210406145153.png

通过debug发现,有三个operator,但是记录的offset是1038,而不是9,不知道正确的读取方法是什么?*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:18:49 679 0
1 条回答
写回答
取消 提交回答
  • 需要通过BatchEnvironment,把Savepoint当做输入,然后构造序列化解析器和类型等信息,通过DataSet.collect()进行解析,就可以读取到目标数据了。希望对你有所帮助~*来自志愿者整理的flink

    2021-12-05 17:48:17
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载