Flink(十)【处理函数】(1)https://developer.aliyun.com/article/1532227
2. 处理时间定时器
和上面一样,既然是处理时间的话,我们数据中带的事件时间就没用了,这里我们给每个来的数据定义一个五秒后的定时器:
public class KeyedProcessTimerDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> { // System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp); return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo Process:keyed SingleOutputStreamOperator<String> process = sensorKs.process( /** * KeyedProcessFunction<K, T, R> * K: key 的类型 * T: data 的类型 * R: return 的类型 */ new KeyedProcessFunction<String, WaterSensor, String>() { /** * 来一条数据调用一次这个方法 * @param value 数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1.获取定时器并注册 TimerService timerService = ctx.timerService(); // todo 1.1 注册事件时间定时器 // 事件时间 也就是当前数据中的 watermark,如果没有则返回 null // Long currentEventTime = ctx.timestamp(); // 注册定时器 - 事件时间 // timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器) // System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentEventTime + ",注册了一个5s的定时器"); // todo 1.2 注册处理时间定时器 // 处理时间 也就是当前的处理时间 - 系统时间 long currentPs = timerService.currentProcessingTime(); // 注册定时器 - 处理时间 timerService.registerProcessingTimeTimer(currentPs+5000L); // 当处理时间为 当前时间+5s 触发闹钟 System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentPs + ",注册了一个5s后的定时器"); // 删除定时器 - 事件时间 // timerService.deleteEventTimeTimer(); // 删除定时器 - 处理时间 // timerService.deleteProcessingTimeTimer(); // 获取当前水位线 long watermark = timerService.currentWatermark(); } // todo 2.定义触发定时器逻辑 /** * 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!! * @param timestamp 当前时间进展 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发"); } } ); process.print(); env.execute(); } }
测试输入:
s1,1,1 s1,2,2 s1,3,3
输出:
当前key=s1,当前时间为 1703043149860,注册了一个5s后的定时器 当前key=s1,当前时间为 1703043151317,注册了一个5s后的定时器 当前key=s1,当前时间为 1703043152807,注册了一个5s后的定时器 当前的 key= s1现在时间为 1703043154860定时器触发 当前的 key= s1现在时间为 1703043156317定时器触发 当前的 key= s1现在时间为 1703043157807定时器触发
可以看到,处理时间语义下,对于同一个 key 它有可能会触发多次。
3. watermark 的滞后性
@Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1.获取定时器并注册 TimerService timerService = ctx.timerService(); // 获取当前水位线 long watermark = timerService.currentWatermark(); System.out.println("当前数据="+value+",当前watermark="+watermark); }
输入:
s1,1,1 s1,5,5 s1,9,9
输出:
当前数据=WaterSensor{id='s1', ts=1, vc=1},当前watermark=-9223372036854775808 当前数据=WaterSensor{id='s1', ts=5, vc=5},当前watermark=-2001 当前数据=WaterSensor{id='s1', ts=9, vc=9},当前watermark=1999
可以看到,当我们的数据 {s1,1,1} 到达后,watermark 并不是 1-3-1ms = -2001 而是 watermark 的初始值 Inerger.MIN_VALUE,这是因为我们的水位线总是插入到数据后面的,而 processElement 方法一次只能处理一个数据,所以当数据 {s1,1,1} 处理完毕之后 watermark=-2001 才会进入 processElement 方法并更新 watermark。
定时器 - 总结
- 只有 KeyedStream 才有定时器
- 事件时间定时器,通过数据的 watermark 来触发
- 注意:watermark = 当前最大事件时间 - 最大等待时间 -1ms
- 在 processElement 中获取到的 watermark 是上一次的 watermark ,因为 watermark 是在数据后面进入 processElement 方法的。
2.3、窗口处理函数
除了 KeyedProcessFunction , 另外一大类常用的处 理 函 数 ,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction 了。
2.3.1、窗口处理函数的使用
进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数(apply/process)、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。
窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似 ,也是基于WindowedStream 直接调用方法就可以,只不过这时调用的是 .process()。就像我们之前窗口那一章节写的全窗口函数:
public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo 1. 指定窗口分配器:基于处理时间的滚动窗口 WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // todo 2. 指定窗口函数:全窗口函数 SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { /** * * @param key 分组的 key * @param context 上下文 * @param elements 全窗口存的数据 * @param out 采集器 * @throws Exception */ @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String start = sdf.format(new Date(startTs)); String end = sdf.format(new Date(endTs)); long size = elements.spliterator().estimateSize(); out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString()); } }); process.print(); env.execute(); } }
2.3.2、ProcessWindowFnction 解析
ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; public void clear(Context context) throws Exception {} public abstract class Context implements java.io.Serializable {...} }
ProcessWindowFunction 依然是一个继承了 AbstractRichFunction 的抽象类,它有四个类型参数:
- IN:input,数据流中窗口任务的输入数据类型。
- OUT:output,窗口任务进行计算之后的输出数据类型。
- KEY:数据中键 key 的类型。
- W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口,W就是 TimeWindow。而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。
因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改了.process()。方法包含四个参数。
- key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段。
- context:当前窗口进行计算的上下文,它的类型就是 ProcessWindowFunction内部定义的抽象类 Context。
- elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
- out:用来发送数据输出计算结果的收集器,类型为 Collector。
可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合(一个迭代器对象)。而上下文context 所包含的内容也跟其他处理函数有所差别:
public abstract class Context implements java.io.Serializable { public abstract W window(); public abstract long currentProcessingTime(); public abstract long currentWatermark(); public abstract KeyedStateStore windowState(); public abstract KeyedStateStore globalState(); public abstract <X> void output(OutputTag<X> outputTag, X value); }
除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化:
- 这里不再持有TimerService 对象,只能通过 currentProcessingTime()和 currentWatermark()来获取当前时间,所以失去了设置定时器的功能;
- 另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。
与此同时,也增加了一些获取其他信息的方法:
- 可以通过.window()直接获取到当前的窗口对象,
- 也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。
注意:这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前 key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前 key 的所有窗口有效。所以我们会发现,ProcessWindowFunction 中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。
这里有一个问题:没有了定时器,那窗口处理函数就失去了一个最给力的武器,如果我们希望有一些定时操作又该怎么做呢?其实仔细思考会发现,对于窗口而言,它本身的定义就包含了一个触发计算的时间点,其实一般情况下是没有必要再去做定时操作的。如果非要这么干,Flink也提供了另外的途径——使用窗口触发器(Trigger)。在触发器中也有一个TriggerContext,它可以起到类似 TimerService 的作用:获取当前时间、注册和删除定时器,另外还可以获取当前的状态。这样设计无疑会让处理流程更加清晰——定时操作也是一种“触发”,所以我们就让所有的触发操作归触发器管,而所有处理数据的操作则归窗口函数管。
至于另一种窗口处理函数 ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是 AllWindowedStream,相当于对没有 keyBy 的数据流直接开窗并调用.process()方法,但如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口:
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction());
2.4、应用案例 - Top N
案例需求:实时统计一段时间内出现次数最多的水位。例如:统计最近10s内出现最多的两个水位,并且每5s更新一次。我们知道,这可以通过一个滑动窗口来实现,于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。其实这就是著名的“Top N”问题。
2.4.1、使用 ProcessAllWindowFunction
我们的数据类型 WaterSenor 的三个属性(id:传感器id,ts:事件时间,vc:水位高度)
/** * 案例: 不同水位出现的次数的 topN */ public class TopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> { // System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp); return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); // todo 思路1: 不做 keyBy 直接使用一个 hashMap<vc,count> 来累加 统一vc的count值 // 窗口大小:10s,步长:5s sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) .process( new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() { @Override public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { // 定义一个hashMap Map<Integer,Integer> map = new HashMap<>(); for (WaterSensor waterSensor : elements) { int vc = waterSensor.vc; map.put(vc,map.getOrDefault(vc,0)+1); } // 排序输出top2,利用 list 对map根据value进行排序 List<Tuple2<Integer, Integer>> list = new ArrayList<>(); for (Integer vc : map.keySet()) { list.add(Tuple2.of(vc,map.get(vc))); } list.sort(new Comparator<Tuple2<Integer, Integer>>() { @Override public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { return o2.f1-o1.f1; } }); StringBuilder builder = new StringBuilder(); builder.append("===================\n"); // 防止越界,考虑list的size可能不够两个 for (int i = 0; i < Math.min(list.size(),2); i++) { Tuple2<Integer, Integer> tuple = list.get(i); builder.append("top").append(i+1).append(": "); builder.append(tuple.f0).append(" -> "); builder.append(tuple.f1).append("\n"); } builder.append("窗口结束时间=").append(DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS")); builder.append("\n"); out.collect(builder.toString()); } } ).print(); env.execute(); } }
注意:滑动窗口一定有第一个步长时被触发!到达第 1 个步长触发第 1 个窗口,到达第 2 个步长触发第 2 个窗口。
上面我们定义了一个大小为 10 ,滑动步长为 5 的窗口,并且等待时间为 3
注意:
- 等待时间内来的数据如果不在窗口范围内并不会计入到窗口中!
- 窗口区间是左闭右开的!
所以,这里的窗口:
- 第一个窗口:[-5,5)(注意:第一个窗口并不是[0,10)!!!)
- 第一个窗口:[0,10)
- ...
测试输入数据:
s1,1,1 s2,2,1 s3,3,2 s4,4,2 s5,5,2 // 窗口范围是左闭右开的,这里的 2 并不计数,这里达到第一个滑动步长,所以要等待3s s6,6,1 s7,7,3 s8,8,3 // 此时才触发第一次计算 s10,10,2 s12,12,1 s13,13,4 // 达到第二个滑动步长,再次触发计算
输出结果:
=================== // [-5,5)的结果 top1: 1 -> 2 top2: 2 -> 2 窗口结束时间=1970-01-01 08:00:05.000 =================== // [0,10)的结果 top1: 1 -> 3 top2: 2 -> 3 窗口结束时间=1970-01-01 08:00:10.000
Flink(十)【处理函数】(3)https://developer.aliyun.com/article/1532231