大家好
我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {
var state:ValueState[BloomFilter[CharSequence]]= null
override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))
state = getRuntimeContext.getState(stateDesc)
}
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {
var filter = state.value
if(filter==null){
println("null filter")
filter= BloomFilter.createCharSequence}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
filter.put(value._2)
state.update(filter)
out.collect(value._2)
}
}
})
通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊
*来自志愿者整理的flink邮件归档
你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。