开发者社区> 问答> 正文

Apache Flink - 事件时间窗口

我想在Apache flink中创建键控窗口,以便每个键的窗口在到达键的第一个事件后n分钟执行。是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达)。如果可能,请向事件说明事件时间和水印的分配,并解释如何在n分钟后调用过程窗口功能。

下面是代码的一部分,可以让您了解我目前正在做什么:

        //Make keyed events so as to start a window for a key
        KeyedStream<SourceData, Tuple> keyedEvents = 
                env.addSource(new MySource(configData),"JSON Source")
                .assignTimestampsAndWatermarks(new MyTimeStamps())
                .setParallelism(1)
                .keyBy("service");
        //Start a window for windowTime time
        DataStream<ResultData> resultData=
                keyedEvents
                .timeWindow(Time.minutes(winTime))
                .process(new ProcessEventWindow(configData))
                .name("Event Collection Window")
                .setParallelism(25);

那么,我如何分配事件时间和水印标记,使得窗口跟随第一个事件的事件时间作为起始点并在10分钟后执行(第一个事件的开始时间对于不同的键可以是不同的)。任何帮助将非常感激。

    /------------ ( window of 10 minutes )

Streams |------------ ( window of 10 minutes )

        \------------ ( window of 10 minutes )

编辑:我用于分配时间戳和水印的类

public class MyTimeStamps implements AssignerWithPeriodicWatermarks {

@Override
public long extractTimestamp(SourceData element, long previousElementTimestamp) {

      //Will return epoch of currentTime
    return GlobalUtilities.getCurrentEpoch();
}

@Override
public Watermark getCurrentWatermark() {
    // TODO Auto-generated method stub
    //Will return epoch of currentTime + 10 minutes
    return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
}

}

展开
收起
社区小助手 2018-12-11 16:02:31 2148 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    不久前我在事件时间窗口方面遇到了类似的问题。这是我的流的样子

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //Consumer Setup

    val stream = env.addSource(consumer)
    .assignTimestampsAndWatermarks(new WMAssigner)

    // Additional Setup here

    stream
    .keyBy { data => data.findValue("service") }
    .window(TumblingEventTimeWindows.of(Time.minutes(10)))
    .process { new WindowProcessor }

    //Sinks go here
    我的WMAssigner类看起来像这样(注意:这允许1分钟的无序事件发生,如果你不想允许延迟,你可以扩展一个不同的Timestamp提取器):

    class WMAssigner extends BoundedOutOfOrdernessTimestampExtractorObjectNode) {
    override def extractTimestamp(element: ObjectNode): Long = {

    val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
    tsStr.toLong

    }
    }
    我想用于Watermarks的时间戳是data.ts字段。

    我的WindowProcessor:

    class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {

    val out = ""
    elements.foreach( value => {
      out = value.findValue("data").findValue("outData")
    }
    out.collect(out)

    }
    }

    2019-07-17 23:19:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像