我想在Apache flink中创建键控窗口,以便每个键的窗口在到达键的第一个事件后n分钟执行。是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达)。如果可能,请向事件说明事件时间和水印的分配,并解释如何在n分钟后调用过程窗口功能。
下面是代码的一部分,可以让您了解我目前正在做什么:
//Make keyed events so as to start a window for a key
KeyedStream<SourceData, Tuple> keyedEvents =
env.addSource(new MySource(configData),"JSON Source")
.assignTimestampsAndWatermarks(new MyTimeStamps())
.setParallelism(1)
.keyBy("service");
//Start a window for windowTime time
DataStream<ResultData> resultData=
keyedEvents
.timeWindow(Time.minutes(winTime))
.process(new ProcessEventWindow(configData))
.name("Event Collection Window")
.setParallelism(25);
那么,我如何分配事件时间和水印标记,使得窗口跟随第一个事件的事件时间作为起始点并在10分钟后执行(第一个事件的开始时间对于不同的键可以是不同的)。任何帮助将非常感激。
/------------ ( window of 10 minutes )
Streams |------------ ( window of 10 minutes )
\------------ ( window of 10 minutes )
编辑:我用于分配时间戳和水印的类
public class MyTimeStamps implements AssignerWithPeriodicWatermarks {
@Override
public long extractTimestamp(SourceData element, long previousElementTimestamp) {
//Will return epoch of currentTime
return GlobalUtilities.getCurrentEpoch();
}
@Override
public Watermark getCurrentWatermark() {
// TODO Auto-generated method stub
//Will return epoch of currentTime + 10 minutes
return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
}
}
不久前我在事件时间窗口方面遇到了类似的问题。这是我的流的样子
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//Consumer Setup
val stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new WMAssigner)
// Additional Setup here
stream
.keyBy { data => data.findValue("service") }
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process { new WindowProcessor }
//Sinks go here
我的WMAssigner类看起来像这样(注意:这允许1分钟的无序事件发生,如果你不想允许延迟,你可以扩展一个不同的Timestamp提取器):
class WMAssigner extends BoundedOutOfOrdernessTimestampExtractorObjectNode) {
override def extractTimestamp(element: ObjectNode): Long = {
val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
tsStr.toLong
}
}
我想用于Watermarks的时间戳是data.ts字段。
我的WindowProcessor:
class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
val out = ""
elements.foreach( value => {
out = value.findValue("data").findValue("outData")
}
out.collect(out)
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。