val stream: DataStream[PageView] = env.addSource(new PVSource()).map(node => { Event(node._1, node.2.toString, 1) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorEvent { override def extractTimestamp(element: Event): Long = { element.createTime.toLong } }) .keyBy(.id) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(3))) .aggregate(new Agg(), new WfPV())
stream
.windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(3)))
.process(new TopWindowAll(3))
.print()
env.execute()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。