目的呢如题:先基于process_time预聚合,最后基于event_time聚合。
预聚合使用10s窗口,最终聚合使用5min窗口,且使用10s的continuous trigger。
同时,为了避免2个5分钟窗口的数据在窗口临界位置时候,被10s的预聚合到一起(错误case),我在预聚合的时候使用的key中多加了个字段(time),time是格式化到5分钟的结尾时间的time。因此这个问题可以忽略。
但是呢,目前发现一个更大的问题。最终窗口输出的key+time的pv存在变小的情况。刚开始很奇怪,想了很久。然后分析出一些问题。
实际key+time1=2: 1000,变为key+time1:
50的情况下。这个key+time1的50pv实际由2个窗口的数据组成,一部分是time1窗口,一部分time2窗口。但是reduce复用了value1因此最终输出的time为time1。
那么为什么pv是50呢。因为time2哪部分pv本身很小。time1那部分pv是在预聚合的最后某10s的数据(仅10s的数据)。
当然,这个更奇怪的地方是,这2部分pv为什么会被聚合到一起进而被输出。这个才是关键。经过分析,最终想到一个细节,之前忽略了同时也想当然了。window输出的时候设置最大时间戳这个,想当然的认为只针对event_time,没想到还针对process_time,这导致time1窗口中最后几秒的数据实际处理时候,肯定已经处于time2时间了(毕竟数据流肯定是延迟的,统计时的时间肯定大于数据的event时间),因此预聚合输出的数据带了process_time的window的maxTs作为输出元素的时间戳。这按照源码逻辑就是这样的。但是我后边继续将这个时间当作了event_time使用。。。。。这个让我很难受。
其实感觉也有点奇怪,flink这个机制为什么不仅针对event_time,还针对process_time。真么一搞会导致2个时间无法在同一个任务流中使用。。。
当然,目前我想着解决方法也是有的,那就是在预聚合窗口之后reAssignTimestampAndWatermark。
当然还有一种是将预聚合窗口也使用event_time,但是我这个任务的key很少,对准确性要求很高,我设置了maxOutOfOrderness为1整天(为了数据流异常后续补数据的时候任务可以正常处理这部分数据),如果都使用event_time窗口,会导致窗口的状态数据翻2倍。使用处理时间的话,预聚合窗口的状态几乎就可以忽略了。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。