"用例:使用来自Kafka记录的EventTime和提取的时间戳。
myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
.keyBy(""platform"")
.window(TumblingEventTimeWindows 5 mins))
.aggregate(AggFunc(), WindowFunc())
.countWindowAll(size)
.apply(someFunc)
.addSink(someSink);
我想要的是:Flink提取时间戳并为每个记录发出初始间隔(例如20秒)的水印,然后它可以周期性地发出水印(例如每10秒)。
原因:如果我使用了PeriodicWatermark,在开始时Flink只会在一段时间后发出水印,并且在我的第一个窗口中5分钟的计数是错误的 - 远大于后续窗口中的计数。我有一个解决方法将setAutoWatermarkInterval设置为100毫秒,但这是必要的。
目前,我必须使用AssignerWithPeriodicWatermark或AssignerWithPunctuatedWatermark。我如何实施这种组合策略的方法?"
"在用水印生成器做一些不寻常的事情之前,我会仔细检查你是否已经正确诊断了这种情况。事实上,事件时间窗口应该具有确定性,并且如果使用相同的输入,则始终产生相同的结果。如果你获得的第一个窗口的结果会根据水印生成的频率而变化,这表示你可能会在水印更频繁地到达时发生延迟事件,并且在水印较少时可以包含这些事件频繁。也许你的水印不正确地说明了你的活动遇到的实际无序程度?或者你的水印可能是基于System.currentTimeMillis()而不是事件时间戳?
此外,第一次窗口与其他窗口不同是正常的,因为时间窗口与时期对齐,而不是第一次事件。当然,这会产生第一个窗口比所有其他窗口占用更短的时间,因此你应该期望它包含更少的事件,而不是更多。
将setAutoWatermarkInterval设置为100ms是一件非常正常的事情。但是如果你真的想避免这种情况,你可以考虑一个AssignerWithPunctuatedWatermarks,它最初为每个事件返回一个水印,然后在一个合适的间隔之后,更少地返回水印。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。