Flink之窗口 (Window) 下篇2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink之窗口 (Window) 下篇

测试水位线和窗口的使用

之前讲过,当水位线到达窗口结束时间时,窗口就会闭合不再接收迟到的数据,因为根据水位线的定义,所有小于等于水位线的数据都已经到达,所以显然 Flink 会认为窗口中的数据都到达了(尽管可能存在迟到数据,也就是时间戳小于当前水位线的数据)。我们可以在之前生成水位线代码 WatermarkTest 的基础上,增加窗口应用做一下测试:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 将数据源改为 socket 文本流,并转换成 Event 类型
        env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                    }
                })
                // 插入水位线的逻辑
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 5s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    // 抽取时间戳的逻辑
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                )
                // 根据 user 分组,开窗统计
                .keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new WatermarkTestResult())
                .print();
        env.execute();
    }
    // 自定义处理窗口函数,输出当前的水位线和窗口信息
    public static class WatermarkTestResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素,窗口闭合计算时,水位线处于:" + currentWatermark);
        }
    }
}

我们这里设置的最大延迟时间是 5 秒,所以当我们在终端启动 nc 程序,也就是 nc –lk 7777 然后输入如下数据时:

Alice, ./home, 1000
Alice, ./cart, 2000
Alice, ./prod?id=100, 10000
Alice, ./prod?id=200, 8000
Alice, ./prod?id=300, 15000

我们会看到如下结果:

窗口 0 ~ 10000 中共有 3 个元素,窗口闭合计算时,水位线处于:9999

我们就会发现,当最后输入[Alice, ./prod?id=300, 15000]时,流中会周期性地(默认 200毫秒)插入一个时间戳为 15000L – 5 * 1000L – 1L = 9999 毫秒的水位线,已经到达了窗口[0,10000)的结束时间,所以会触发窗口的闭合计算。而后面再输入一条[Alice, ./prod?id=200, 9000]时,将不会有任何结果;因为这是一条迟到数据,它所属于的窗口已经触发计算然后销毁了(窗口默认被销毁),所以无法再进入到窗口中,自然也就无法更新计算结果了。窗口中的迟到数据默认会被丢弃,这会导致计算结果不够准确。

其他 API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,让我们可以更加灵活地控制窗口行为。

触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
  .window(...)
  .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:


onElement():窗口中每到来一个元素,都会调用这个方法。

onEventTime():当注册的事件时间定时器触发时,将调用这个方法。

onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。

clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

可以看到,除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这几个方法的参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。这里提到的“定时器”(Timer),其实就是我们设定的一个“闹钟”,代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作。很明显,对于时间窗口(TimeWindow)而言,就应该是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。关于定时器的内容,我们在后面讲解处理函数(process function)时还会提到。


上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。


CONTINUE(继续):什么都不做

FIRE(触发):触发计算,输出结果

PURGE(清除):清空窗口中的所有数据,销毁窗口

FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

我们可以看到,Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。稍后我们就会看到它们分开操作的场景。


下面我们举一个例子。在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class TriggerExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        })
                )
                .keyBy(r -> r.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();
        env.execute();
    }
    public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> collector) throws Exception {
            collector.collect(
                    new UrlViewCount(
                            s,
                            // 获取迭代器中的元素个数
                            iterable.spliterator().getExactSizeIfKnown(),
                            context.window().getStart(),
                            context.window().getEnd()
                    )
            );
        }
    }
    public static class MyTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                            new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
                    );
            if (isFirstEvent.value() == null) {
                for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 1000L) {
                    triggerContext.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }
        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE;
        }
        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }
        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                            new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
                    );
            isFirstEvent.clear();
        }
    }
}

输出结果如下:

UrlViewCount{url='./prod?id=1', count=1, windowStart=2021-07-01 14:44:10.0, windowEnd=2021-07-01 14:44:20.0}
UrlViewCount{url='./prod?id=1', count=1, windowStart=2021-07-01 14:44:10.0, windowEnd=2021-07-01 14:44:20.0}
UrlViewCount{url='./prod?id=1', count=1, windowStart=2021-07-01 14:44:10.0, windowEnd=2021-07-01 14:44:20.0}
UrlViewCount{url='./prod?id=1', count=1, windowStart=2021-07-01 14:44:10.0, windowEnd=2021-07-01 14:44:20.0}

移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
  .window(...)
  .evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

允许延迟(Allowed Lateness)

在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。这也很好理解:窗口触发计算就像发车,如果要赶的车已经开走了,又不能坐其他的车(保证分配窗口的正确性),那就只好放弃坐班车了。


不过在多数情况下,直接丢弃数据也会导致统计结果不准确,我们还是希望该上车的人都能上来。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。


基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.hours(1)))
  .allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整就触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了。


从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

将迟到的数据放入侧输出流

我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?


Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。


基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.hours(1)))
  .sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
  .aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

窗口的生命周期

熟悉了窗口 API 的使用,我们再回头梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结。

窗口的创建

窗口的类型和基本信息由窗口分配器(window assigners)指定,但窗口不会预先创建好,而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时,就会创建对应的窗口。

窗口计算的触发

除了窗口分配器,每个窗口还会有自己的窗口函数(window functions)和触发器(trigger)。窗口函数可以分为增量聚合函数和全窗口函数,主要定义了窗口中计算的逻辑;而触发器则是指定调用窗口函数的条件。


对于不同的窗口类型,触发计算的条件也会不同。例如,一个滚动事件时间窗口,应该在水位线到达窗口结束时间的时候触发计算,属于“定点发车”;而一个计数窗口,会在窗口中元素数量达到定义大小时触发计算,属于“人满就发车”。所以 Flink 预定义的窗口类型都有对应内置的触发器。


对于事件时间窗口而言,除去到达结束时间的“定点发车”,还有另一种情形。当我们设置了允许延迟,那么如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间,这期间内到达的迟到数据也会触发窗口计算。这类似于没有准时赶上班车的人又追上了车,这时车要再次停靠、开门,将新的数据整合统计进来。


窗口的销毁

一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw)实现的,而全局窗口不会清除状态,所以就不会被销毁。


在特殊的场景下,窗口的销毁和触发计算会有所不同。事件时间语义下,如果设置了允许延迟,那么在水位线到达窗口结束时间时,仍然不会销毁窗口;窗口真正被完全删除的时间点,是窗口的结束时间加上用户指定的允许延迟时间。

窗口 API 调用总结

到目前为止,我们已经彻底明白了 Flink 中窗口的概念和 Window API 的调用,我们再用一张图做一个完整总结

Window API 首先按照时候按键分区分成两类。keyBy 之后的 KeyedStream,可以调用.window()方法声明按键分区窗口(Keyed Windows);而如果不做 keyBy,DataStream 也可以直接调用.windowAll()声明非按键分区窗口。之后的方法调用就完全一样了。


接下来首先是通过.window()/.windowAll()方法定义窗口分配器,得到 WindowedStream;然 后 通 过 各 种 转 换 方 法 ( reduce/aggregate/apply/process ) 给 出 窗 口 函 数(ReduceFunction/AggregateFunction/ProcessWindowFunction),定义窗口的具体计算处理逻辑,转换之后重新得到 DataStream。这两者必不可少,是窗口算子(WindowOperator)最重要的组成部分。


此外,在这两者之间,还可以基于 WindowedStream 调用.trigger()自定义触发器、调用.evictor()定义移除器、调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData()将迟到数据写入侧输出流,这些都是可选的 API,一般不需要实现。而如果定义了侧输出流,可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流。

迟到数据的处理

有了事件时间、水位线和窗口的相关知识,现在就可以系统性地讨论一下怎样处理迟到数据了。我们知道,所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。


事件时间里用来表示时钟进展的就是水位线(watermark)。对于乱序流,水位线本身就可以设置一个延迟时间;而做窗口计算时,我们又可以设置窗口的允许延迟时间;另外窗口还有将迟到数据输出到测输出流的用法。所有的这些方法,它们之间有什么关系,我们又该怎样合理利用呢?这一节我们就来讨论这个问题。

设置水位线延迟时间

水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。


之前我们讲到触发器时曾提到过“定时器”,时间窗口的操作底层就是靠定时器来控制触发的。既然是底层机制,定时器自然就不可能是窗口的专利了;事实上它是 Flink 底层 API— —处理函数(process function)的重要部分。


所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后,相当于是上帝拨动了琴弦,所有人的表都变慢了。


既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级。


当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。


允许窗口处理迟到数据

水位线延迟设置的比较小,那之后如果仍有数据迟到该怎么办?对于窗口计算而言,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么之后再来的数据就要被丢弃了。


自然想到,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。


这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。


类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟;到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来,然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了。


所以我们将水位线的延迟和窗口的允许延迟数据结合起来,最后的效果就是先快速实时地输出一个近似的结果,而后再不断调整,最终得到正确的计算结果。回想流处理的发展过程,这不就是著名的 Lambda 架构吗?原先需要两套独立的系统来同时保证实时性和结果的最终正确性,如今 Flink 一套系统就全部搞定了。


将迟到数据放入窗口侧输出流

即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?


那就要用到最后一招了:用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。


如果还用赶班车来类比,那就是车已经上高速开走了,这班车是肯定赶不上了。不过我们还留下了行进路线和联系方式,迟到的人如果想办法辗转到了目的地,还是可以和大部队会合的。最终,所有该到的人都会在目的地出现。


所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。我们可以回忆一下之前 6.3.5 小节统计每个 url 浏览次数的代码 UrlViewCountExample,稍作改进,增加处理迟到数据的功能。具体代码如下。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class ProcessLateDataExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 读取 socket 文本流
        SingleOutputStreamOperator<Event> stream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(" ");
                        return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                    }
                })
                // 方式一:设置 watermark 延迟时间,2 秒钟
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long
                                    recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        // 定义侧输出流标签
        OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
        SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 方式二:允许窗口处理迟到数据,设置 1 分钟的等待时间
                .allowedLateness(Time.minutes(1))
                // 方式三:将最后的迟到数据输出到侧输出流
                .sideOutputLateData(outputTag)
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
        result.print("result");
        result.getSideOutput(outputTag).print("late");
        // 为方便观察,可以将原始数据也输出
        stream.print("input");
        env.execute();
    }
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
        }
    }
}

我们还是先启动 nc –lk 7777,然后依次输入以下数据:

Alice, ./home, 1000
Alice, ./home, 2000
Alice, ./home, 10000
Alice, ./home, 9000
Alice, ./cart, 12000
Alice, ./prod?id=100, 15000
Alice, ./home, 9000
Alice, ./home, 8000
Alice, ./prod?id=200, 70000
Alice, ./home, 8000
Alice, ./prod?id=300, 72000
Alice, ./home, 8000

下面我们来分析一下程序的运行过程。当输入数据[Alice, ./home, 10000]时,时间戳为10000,由于设置了 2 秒钟的水位线延迟时间,所以此时水位线到达了 8 秒(事实上是 7999毫秒,这里不再追究减 1 的细节),并没有触发 [0, 10s) 窗口的计算;所以接下来时间戳为 9000的数据到来,同样可以直接进入窗口做增量聚合。当时间戳为 12000 的数据到来时(无所谓url 是什么,所有数据都可以推动水位线前进),水位线到达了 12000 – 2 * 1000 = 10000,所以触发了[0, 10s) 窗口的计算,第一次输出了窗口统计结果,如下所示:

result> UrlViewCount{url='./home,', count=3, windowStart=1970-01-01 08:00:00.0, windowEnd=1970-01-01 08:00:10.0}

这里 count 值为 3,就包括了之前输入的时间戳为 1000、2000、9000 的三条数据。


不过窗口触发计算之后并没有关闭销毁,而是继续等待迟到数据。之后时间戳为 15000的数据继续推进水位线,此时时钟已经进展到了 13000ms;此时再来一条时间戳为 9000 的数据,我们会发现立即输出了一条统计结果:

result> UrlViewCount{url='./home,', count=4, windowStart=1970-01-01 08:00:00.0, windowEnd=1970-01-01 08:00:10.0}

很明显,这仍然是[0, 10s) 的窗口,在之前计数值 3 的基础上继续叠加,更新统计结果为4。所以允许窗口处理迟到数据之后,相当于窗口有了一段等待时间,在这期间所有的迟到数据都会立即触发窗口计算,更新之前的结果。

因此,之后时间戳为 8000 的数据到来,同样会立即输出:

result> UrlViewCount{url='./home,', count=5, windowStart=1970-01-01 08:00:00.0, windowEnd=1970-01-01 08:00:10.0}

我们设置窗口等待的时间为 1 分钟,所以当时间推进到 10000 + 60 * 1000 = 70000 时,窗口就会真正被销毁。此前的所有迟到数据可以直接更新窗口的计算结果,而之后的迟到数据已经无法整合进窗口,就只能用侧输出流来捕获了。需要注意的是,这里的“时间”依然是由水

位线来指示的,所以时间戳为 70000 的数据到来,并不会触发窗口的销毁;当时间戳为 72000的数据到来,水位线推进到了 72000 – 2 * 1000 = 70000,此时窗口真正销毁关闭,之后再来的迟到数据就会输出到侧输出流了:

late> Event{user='Alice,', url='./home,', timestamp=1970-01-01 08:00:08.0}

尚硅谷yyds

学习资料来自于尚硅谷:https://www.bilibili.com/video/BV133411s7Sa?p=1

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
6天前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
6天前
|
流计算
Flink窗口——window
Flink窗口——window
26 0
|
5天前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之如果一个窗口区间没有数据,若不会开窗就没法使用triggers赋默认值
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 1
|
4天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如果窗口中没有数据,但是想要在UDAF中输出一个默认值,该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 0
|
6天前
|
BI API 流计算
[实时流基础 flink] 窗口
[实时流基础 flink] 窗口
|
6天前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器
|
6天前
|
SQL Oracle 算法
Flink CDC 数据源问题之不支持窗口聚合如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
56 0
|
6天前
|
监控 Java 流计算
Flink中的窗口操作是什么?请解释其作用和使用场景。
Flink中的窗口操作是什么?请解释其作用和使用场景。
29 0
|
6天前
|
SQL 自然语言处理 机器人
Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
【1月更文挑战第3天】【1月更文挑战第12篇】Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
59 1