开发者社区> 问答> 正文

flink state问题

大家好 

我有一个去重的需求,想节省内存用的bloomfilter,代码如下:

.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {

var state:ValueState[BloomFilter[CharSequence]]= null

override def open(parameters: Configuration): Unit = {

val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))

state = getRuntimeContext.getState(stateDesc)

}

override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {

var filter = state.value

if(filter==null){

println("null filter")

filter= BloomFilter.createCharSequence}

//val contains = filter.mightContain(value._2)

if(!filter.mightContain(value._2)) {

filter.put(value._2)

state.update(filter)

out.collect(value._2)

}

}

})

通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-07 15:22:44 676 0
1 条回答
写回答
取消 提交回答
  • 你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果

    savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

    [1]

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档

    2021-12-07 15:51:25
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 案例集(2022版) 立即下载
《Apache Flink-实时即未来》 立即下载
Flink Forward Virtual Conferen 立即下载