不多废话,直接上代码,并解释关键代码
三重保证 watermark(水位线) | allowedLateness(最大迟到数据) | sideOutputLateData(侧输出流)
val dataDstream: DataStream[SensorReading] = inputDStream
.map( data => {
val dataArray: Array[String] = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
// .assignAscendingTimestamps( _.timestamp * 1000L ) // 理想状态下直接指定时间戳字段就可以了
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading]
// 给WaterMark的一个初始值延时时间
(Time.milliseconds(1000)) {
// 指定时间戳字段以秒为单位 * 1000
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val resultDStream: DataStream[SensorReading] = dataDstream
.keyBy("id")
.timeWindow( Time.seconds(5) )
.allowedLateness( Time.minutes(1) )
.sideOutputLateData( new OutputTag[SensorReading]("late") )
.reduce( MyReduceFunc() )
dataDstream.print("data")
resultDStream.print("result")
// 获取测输出流的late并打印
resultDStream.getSideOutput( new OutputTag[SensorReading]("late") ).print("late")
env.execute("eventTime test job")
watermark ->assignTimestampsAndWatermarks: 给这个作业设置水位线,这里设置水位线的方法是可以自己在代码中按自己的需求做一个水位线。 大多情况下可以直接按当前系统时间-10 秒
allowedLateness -> allowedLateness: 根据自己需求定义允许延迟时间
sideOutputLateData ->sideOutputLateData: 将 晚于 窗口+allowedLateness 的数据保存下来。
resultDStream.getSideOutput(new OutputTagSensorReading) 这个结果就是。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。