有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。 数据每隔10秒会过来一批。 代码如下图:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
env.setParallelism(parallel)
env.addSource(source)
.map(json => {
new InvokingInfoWrapper(xxx)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
invoking.timestamp
}
})
.keyBy(invokingInfo => {
s"${invokingInfo.caller}_${invokingInfo.callee}"
})
.timeWindow(Time.seconds(60), Time.seconds(10))
.reduce(innerReducer).map(invokingInfo => { // ##2map
//some mapping code
invokingInfo
})
.addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
由于是在预发布环境上线, 流量不大,我观察到一个现场如下: 1. 第一条数据的时间戳为03:15:48 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) 4. 第四条数据的时间戳为03:17:55, 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。
感觉前三条数据给吞了。
为什么呢?*来自志愿者整理的flink邮件归档
Hi,
应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams [2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。