背景
一般情况下 Flink 提供的窗口可以满足我们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午 7 点到第二天上午 7 点。类似于这种情况 Flink 默认提供的窗口是不支持的,因为 Flink 计算窗口的开始时间和结束时间是根据数据本身携带的时间戳然后把数据划分到不同的窗口的,所以它不是一个固定的范围。这个时候就需要我们自己实现窗口划分的逻辑。Flink 提供了 WindowAssigner 抽象类,我们只需要实现 assignWindows 方法即可。
WindowAssigner 源码
@PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a {@code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {@link WindowAssignerContext} in which the assigner operates. */ public abstract Collection<W> assignWindows( T element, long timestamp, WindowAssignerContext context); /** Returns the default trigger associated with this {@code WindowAssigner}. */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a {@link TypeSerializer} for serializing windows that are assigned by this {@code * WindowAssigner}. */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); /** * Returns {@code true} if elements are assigned to windows based on event time, {@code false} * otherwise. */ public abstract boolean isEventTime(); }
其中 assignWindows 方法决定了一条数据应该划分到几个窗口里面,getDefaultTrigger 返回和 WindowAssigner 相关联的默认触发器,决定何时触发窗口计算,getWindowSerializer 返回窗口的序列化器,isEventTime 返回是否是 eventtime 时间语义。
自定义 MyEventTimeWindow 实现
package flink.streaming.window; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.List; /** * 自定义实现 window */ public class MyEventTimeWindow extends WindowAssigner<Object, TimeWindow> { // 窗口的大小 private final long size; // 多长时间滑动一次 private final long slide; // 窗口偏移量 private final long offset; protected MyEventTimeWindow(long size, long slide, long offset) { this.size = size; this.slide = slide; this.offset = offset; } public static MyEventTimeWindow of(Time size, Time slide, Time offset) { return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds()); } public static MyEventTimeWindow of(Time size, Time slide) { return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), 0L); } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext windowAssignerContext) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(timestamp); // 设置从每天的0点开始计算 calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); // 获取窗口的开始时间 其实就是 0 点 long winStart = calendar.getTimeInMillis(); // 获取窗口的结束时间,就是在开始时间的基础上加上窗口的长度 这里是 1 天 calendar.add(Calendar.DATE, 1); // 获取窗口的结束时间 其实就是第二天的 0 点 long winEnd = calendar.getTimeInMillis() + 1; String format = String.format("window的开始时间:%s,window的结束时间:%s", winStart, winEnd); System.out.println(format); // 当前数据所属窗口的结束时间 long currentWindowEnd = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + slide; System.out.println(TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + "====" + currentWindowEnd); // 一条数据属于几个窗口 因为是滑动窗口一条数据会分配到多个窗口里 int windowCounts = (int) ((winEnd - currentWindowEnd) / slide); List<TimeWindow> windows = new ArrayList<>(windowCounts); long currentEnd = currentWindowEnd; if (timestamp > Long.MIN_VALUE) { while (currentEnd < winEnd) { windows.add(new TimeWindow(winStart, currentEnd)); currentEnd += slide; } } return windows; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) { return EventTimeTrigger.create(); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } }
这里主要有两个问题:
1,数据被分到几个窗口?窗口的长度 / 窗口滑动的步长 = 窗口的个数。
2,窗口的开始时间和结束时间怎么计算?对应的 TimeWindow#getWindowStartWithOffset 方法。
getWindowStartWithOffset 源码
/** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
窗口的开始时间主要就是通过上面的算法计算而来,有了窗口的开始时间,那结束时间就非常简单了,直接加上窗口的大小就好了。
验证结果
window = ds.window(MyEventTimeWindow.of(Time.days(1), Time.hours(1))) window的开始时间:1639238400000(2021-12-12 00:00:00),window的结束时间:1639324800001(2021-12-13 00:00:00) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639242000000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639245600000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639249200000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639252800000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639256400000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639260000000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639263600000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639267200000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639270800000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639274400000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639278000000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639281600000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639285200000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639288800000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639292400000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639296000000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639299600000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639303200000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639306800000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639310400000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639314000000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639317600000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639321200000) 3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639324800000)
窗口的开始时间是 2021-12-12 00:00:00,窗口的结束时间是 2021-12-13 00:00:00。窗口的长度是 24 小时,每隔 1 小时滑动一次,所以一条数据会分配到 24 个窗口里面,所以触发了 24 个窗口计算,结果也没有问题,这样就实现了任意时间的滑动窗口。