开发者社区> 问答> 正文

FLINK 不同 StateBackend ProcessWindowFunction的差别?

’--版本 FLINK 1.9.1 ON YARN

--过程 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据 --问题 new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉, 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。 这种计算场景有更好的计算方法吗?

--部分代码 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

new ProcessWindowFunction{ public void process(Tuple tuple, Context context, Iterable elements, Collector out) throws Exception { for (Iterator iter = elements.iterator(); iter.hasNext(); ) { .... iter.remove(); } } .... }*来自志愿者整理的flink邮件归档

展开
收起
船长的小螺号 2021-12-03 10:06:33 856 0
1 条回答
写回答
取消 提交回答
  • 使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]

    至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。

    而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。

    [1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377

    [2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57

    [3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119*来自志愿者整理的FLINK邮件归档

    2021-12-03 10:41:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载