开发者社区> 问答> 正文

CheckpointedFunction#snapshotState访问键控状态报错

class A extends KeyedProcessFunction<String, Object, String> implements CheckpointedFunction {

private MapState<String, LiveBean> liveBeanState;

  @Override     public void initializeState(FunctionInitializationContext context) throws Exception {         MapStateDescriptor<String, LiveBean> descriptor = new MapStateDescriptor<>("liveState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(LiveBean.class));         this.liveBeanState = context.getKeyedStateStore().getMapState(descriptor);     }

     @Override     public void snapshotState(FunctionSnapshotContext context) throws Exception {         Iterator<Map.Entry<String, LiveBean>> iter = liveBeanState.iterator(); // -> Exception here         while (iter.hasNext()) {             Map.Entry<String, LiveBean> e = iter.next();             hbaseDao.singlePut("table", StringUtils.reverse(e.getKey()), "cf", "info", JSON.toJSONString(e.getValue()));         }     }

}

异常信息如下 java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. 需要在checkpoint的时候将键控状态存入hbase,该如何处理呢*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:22:00 611 0
1 条回答
写回答
取消 提交回答
  • Hi snapshotState主要是给operator state用的,异常原因是keyed state 访问时需要设置currentKey的,但是currentKey是当前正在处理的record的key,与snapshotState的执行时候的语义不一样,执行snapshotState方法的时候,是可以没有当前record的。

    如果想要访问整个keyed state,可以通过 KeyedStateBackend#getKeys(String state, N namespace) 来访问,但还是不建议将keyed state写入到HBase,因为Flink更希望你是按照per record的访问,而不是全局访问,后者效率和性能都不好。*来自志愿者整理的flink

    2021-12-05 17:38:37
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
面向失败设计 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载