开发者社区 问答 正文

CheckpointedFunction#snapshotState访问键控状态报错

class A extends KeyedProcessFunction<String, Object, String> implements CheckpointedFunction {

private MapState<String, LiveBean> liveBeanState;

  @Override     public void initializeState(FunctionInitializationContext context) throws Exception {         MapStateDescriptor<String, LiveBean> descriptor = new MapStateDescriptor<>("liveState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(LiveBean.class));         this.liveBeanState = context.getKeyedStateStore().getMapState(descriptor);     }

     @Override     public void snapshotState(FunctionSnapshotContext context) throws Exception {         Iterator<Map.Entry<String, LiveBean>> iter = liveBeanState.iterator(); // -> Exception here         while (iter.hasNext()) {             Map.Entry<String, LiveBean> e = iter.next();             hbaseDao.singlePut("table", StringUtils.reverse(e.getKey()), "cf", "info", JSON.toJSONString(e.getValue()));         }     }

}

异常信息如下 java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. 需要在checkpoint的时候将键控状态存入hbase,该如何处理呢*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:22:00 647 分享
分享
版权
举报
1 条回答
写回答
取消 提交回答
  • Hi snapshotState主要是给operator state用的,异常原因是keyed state 访问时需要设置currentKey的,但是currentKey是当前正在处理的record的key,与snapshotState的执行时候的语义不一样,执行snapshotState方法的时候,是可以没有当前record的。

    如果想要访问整个keyed state,可以通过 KeyedStateBackend#getKeys(String state, N namespace) 来访问,但还是不建议将keyed state写入到HBase,因为Flink更希望你是按照per record的访问,而不是全局访问,后者效率和性能都不好。*来自志愿者整理的flink

    2021-12-05 17:38:37 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等