开发者社区> 问答> 正文

flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。 数据每隔10秒会过来一批。 代码如下图:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
.map(json => {
new InvokingInfoWrapper(xxx)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
invoking.timestamp
}
})
.keyBy(invokingInfo => {
s"${invokingInfo.caller}_${invokingInfo.callee}"
})
.timeWindow(Time.seconds(60), Time.seconds(10))
.reduce(innerReducer).map(invokingInfo => { // ##2map
//some mapping code
invokingInfo
})
.addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

由于是在预发布环境上线, 流量不大,我观察到一个现场如下: 1. 第一条数据的时间戳为03:15:48 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口) 3. 第三条数据的时间戳为03:16:06, 触发reduce操作(同样5次) 4. 第四条数据的时间戳为03:17:55, 这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。 5. 第五条数据的时间戳为03:18:01, 这时候触发了跟第四条数据的reduce操作。

感觉前三条数据给吞了。

为什么呢?*来自志愿者整理的flink邮件归档

展开
收起
雪哥哥 2021-12-07 16:10:40 1030 0
1 条回答
写回答
取消 提交回答
  • Hi,

    应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

    [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams [2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time*来自志愿者整理的flink

    2021-12-07 16:33:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载