开发者社区> 问答> 正文

关于先基于process_time预聚合,再基于event_time聚合的问题。

目的呢如题:先基于process_time预聚合,最后基于event_time聚合。

预聚合使用10s窗口,最终聚合使用5min窗口,且使用10s的continuous trigger。

同时,为了避免2个5分钟窗口的数据在窗口临界位置时候,被10s的预聚合到一起(错误case),我在预聚合的时候使用的key中多加了个字段(time),time是格式化到5分钟的结尾时间的time。因此这个问题可以忽略。

但是呢,目前发现一个更大的问题。最终窗口输出的key+time的pv存在变小的情况。刚开始很奇怪,想了很久。然后分析出一些问题。

实际key+time1=2: 1000,变为key+time1:

50的情况下。这个key+time1的50pv实际由2个窗口的数据组成,一部分是time1窗口,一部分time2窗口。但是reduce复用了value1因此最终输出的time为time1。

那么为什么pv是50呢。因为time2哪部分pv本身很小。time1那部分pv是在预聚合的最后某10s的数据(仅10s的数据)。

当然,这个更奇怪的地方是,这2部分pv为什么会被聚合到一起进而被输出。这个才是关键。经过分析,最终想到一个细节,之前忽略了同时也想当然了。window输出的时候设置最大时间戳这个,想当然的认为只针对event_time,没想到还针对process_time,这导致time1窗口中最后几秒的数据实际处理时候,肯定已经处于time2时间了(毕竟数据流肯定是延迟的,统计时的时间肯定大于数据的event时间),因此预聚合输出的数据带了process_time的window的maxTs作为输出元素的时间戳。这按照源码逻辑就是这样的。但是我后边继续将这个时间当作了event_time使用。。。。。这个让我很难受。

其实感觉也有点奇怪,flink这个机制为什么不仅针对event_time,还针对process_time。真么一搞会导致2个时间无法在同一个任务流中使用。。。

当然,目前我想着解决方法也是有的,那就是在预聚合窗口之后reAssignTimestampAndWatermark。

当然还有一种是将预聚合窗口也使用event_time,但是我这个任务的key很少,对准确性要求很高,我设置了maxOutOfOrderness为1整天(为了数据流异常后续补数据的时候任务可以正常处理这部分数据),如果都使用event_time窗口,会导致窗口的状态数据翻2倍。使用处理时间的话,预聚合窗口的状态几乎就可以忽略了。*来自志愿者整理的flink邮件归档

展开
收起
CCCC 2021-12-02 15:06:03 722 0
1 条回答
写回答
取消 提交回答
  • 这个问题基本分析应该没啥问题……*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:51:20
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Lambda Processing for Near Time Search Indexing 立即下载
对 2000 多亿条数据做一次 group by 需要多久? 立即下载
对2000多亿条数据做一次Group By 需要多久 立即下载