开发者社区> 问答> 正文

ProcessWindowFunction中如何有效清除state呢

 在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢? 

--部分代码 

  .window(TumblingProcessingTimeWindows.of(Time.days(1)))   .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 

.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { 

        private var pv_st: ValueState[Long] = _            

        override def open(parameters: Configuration): Unit = { 

         pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) 

        } 

       override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { 

          var c_st = 0 

    

          val elementsIterator = elements.iterator 

          // 遍历窗口数据,获取唯一word 

          while (elementsIterator.hasNext) { 

            val ac_name = elementsIterator.next()._2 

            if(!ac_name.isEmpty && ac_name.equals("listentime")){ 

              c_st +=1 

            } 

          } 

          val time: Date = new Date() 

          val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") 

          val date = dateFormat.format(time) 

          // add current 

         pv_st.update(pv_st.value() + c_st) 

          var jsonStr = ""+key.getField(0)+"_"+date+"&" // json格式开始 

          jsonStr += "{"+ 

                    ""yesterday_foreground_play_pv":""+pv_st.value()+ 

                    ""}"; 

          //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加 

          if(stateDate.equals("") || stateDate.equals(date)){ 

            stateDate=date 

            out.collect(jsonStr) 

          }else{ 

            out.collect(jsonStr) 

            pv_st.clear() 

            stateDate=date 

          } 

        } 

      })*来自志愿者整理的flink邮件归档```js

展开
收起
塔塔塔塔塔塔 2021-12-02 17:53:08 1027 0
1 条回答
写回答
取消 提交回答
  • 从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。 

    其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。*来自志愿者整理的FLINK邮件归档

    2021-12-02 18:13:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载