开发者社区> 问答> 正文

CheckpointedFunction#snapshotState访问键控状态报错

class A extends KeyedProcessFunction<String, Object, String> implements CheckpointedFunction {

private MapState<String, LiveBean> liveBeanState;

  @Override     public void initializeState(FunctionInitializationContext context) throws Exception {         MapStateDescriptor<String, LiveBean> descriptor = new MapStateDescriptor<>("liveState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(LiveBean.class));         this.liveBeanState = context.getKeyedStateStore().getMapState(descriptor);     }

     @Override     public void snapshotState(FunctionSnapshotContext context) throws Exception {         Iterator<Map.Entry<String, LiveBean>> iter = liveBeanState.iterator(); // -> Exception here         while (iter.hasNext()) {             Map.Entry<String, LiveBean> e = iter.next();             hbaseDao.singlePut("table", StringUtils.reverse(e.getKey()), "cf", "info", JSON.toJSONString(e.getValue()));         }     }

}

异常信息如下 java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. 需要在checkpoint的时候将键控状态存入hbase,该如何处理呢*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:22:01 646 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
面向失败设计 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载