Flink(十)【处理函数】(2)https://developer.aliyun.com/article/1532230
2.4.2、使用 KeyedProcessFunction
上面我们没有按键区分,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为 1,在实际应用中是要尽量避免的,因为如果数据量很大,一个并行度的情况下机器受不了,而且全窗口函数是在最后窗口要关闭(滚动窗口)或者移动(滑动窗口)时才对有窗口内的数据进行计算,所以计算压力可想而知;所以 Flink 官方也并不推荐使用 AllWindowedStream 进行处理。另外,我们在全窗口函数中定义了 HashMap来统计 水位 的出现次数,计算过程是要先收集齐所有数据、然后再逐一遍历更新 HashMap,这显然不够高效。如果我们可以利用增量聚合函数的特性,每来一条数据就更新一次该水位出现的次数,那么到窗口触发计算时只需要做排序输出就可以了。
所以优化的思路就是,先按照 vc 对数据进行 keyBy 分区,然后开窗进行增量聚合。所以我们先用增量聚合函数 AggregateFunction 对每个 vc 的次数进行统计,然后结合 ProcessWindowFunction 排序输出最终结果。
总结:
- 我们首先根据数据的 vc 进行 keyBy 分区,开窗(根据需求开一个滑动窗口,窗口大小10s,滑动步长5s)
- 使用聚合函数(aggregate)结合全窗口函数(ProcessWindowFunction),先把每个区的数据(相同 vc)的次数统计出来得到 count,然后使用全窗口函数把返回值封装成 Tuple3 (vc,count,endWindow)的格式,因为我们要根据不同窗口范围进行统计 TopN
- 上面最终的结果是一个普通的 DataStream,我们需要再根据窗口范围(上面Tuple3 中的 endWindow字段)进行一个 keyBy 分区,把每个范围的数据放到一起进行统一排序(这里使用 hashMap(key=windowEnd,value=Tuple3<vc,count,windowEnd>) 进行存储,使用 arrayList 进行排序)。
- 使用定时器,当 processElement 数据到齐后进行触发计算输出。
/** * 案例: 不同水位出现的次数的 topN */ public class KeyedProcessFunctionTopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { // System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp); return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); // todo 思路2: 使用 keyedProcessFunction 实现 KeyedStream<WaterSensor, Integer> keyedStream = sensorDS.keyBy(sensor -> sensor.vc); SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // AggregateFunction 3个泛型参数: 输入类型,累加器类型,输出类型 .aggregate(new AggregateFunction<WaterSensor, Integer, Integer>() { // 累加器初始值 @Override public Integer createAccumulator() { return 0; } // 累加过程: 直接+1,毕竟我们都是相同key @Override public Integer add(WaterSensor value, Integer accumulator) { return accumulator + 1; } // 累加结果直接返回 @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return null; } }, // ProcessWindowFunction的4个泛型参数: 输入类型、输出类型、键类型、窗口类型 // 这里由于我们后面要根据输出结果区分数据是来自哪个窗口的,所以使用了Tuple3<vc,count,windowEnd> 带上了结束窗口的标签 new ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>() { @Override public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception { // 迭代器只有一条数据 所以 iterator.next() 就是它的全部数据了 Integer count = elements.iterator().next(); long windowEnd = context.window().getEnd(); out.collect(Tuple3.of(key, count, windowEnd)); } }); /** * windowAgg:SingOutputStreamOperator 的聚合结果: * vc=1,count=100,windowEnd=10s * vc=2,count=70,windowEnd=10s * vc=3,count=80,windowEnd=10s * 开窗聚合后,就变成了普通的数据流SingOutputStreamOperator(继承自 DataStream),所以我们自己给聚合结果打上了窗口结束的标签(windowEnd) */ // 2. 按照窗口结束标签进行 keyBy 保证同一窗口时间范围的数据统一处理,之后再排序 windowAgg.keyBy(r -> r.f2).process(new TopN(2)).print(); env.execute(); } public static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer,Integer,Long>,String>{ private Map<Long,List<Tuple3<Integer,Integer,Long>>> map; private int threshold; public TopN(int threshold) { this.threshold = threshold; map = new HashMap<>(); } // Tuple3<Integer, Integer, Long> value : Tuple3的元素类型: vc,count,windowEnd @Override public void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception { // 进入这个方法的都只是一条数据,要排序就需要都到齐才行 // 1. 存到 hashMap Long windowEnd = value.f2; if (map.containsKey(windowEnd)){ List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd); list.add(value); }else { List<Tuple3<Integer, Integer, Long>> list = new ArrayList<>(); list.add(value); map.put(windowEnd,list); } // 注册一个定时器,windowEnd+1ms 触发 // 因为同一个窗口范围应该同时输出,只不过是一条一条调用processElement方法,1ms就够执行完了 ctx.timerService().registerEventTimeTimer(windowEnd + 1); // 这里 out 不用操作 } // 定时器触发逻辑 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); // 同一个窗口的计算结果攒齐了,需要开启排序和取 top N // 1. 排序 Long windowEnd = ctx.getCurrentKey(); List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd); list.sort((o1,o2) -> o2.f1-o1.f1); // 2. 取 topN StringBuilder builder = new StringBuilder(); builder.append("===================\n"); // 防止越界,考虑list的size可能不够两个 for (int i = 0; i < Math.min(list.size(),threshold); i++) { Tuple3<Integer, Integer, Long> tuple = list.get(i); builder.append("top").append(i+1).append(": "); builder.append(tuple.f0).append(" -> "); builder.append(tuple.f1).append("\n"); builder.append("窗口结束时间=").append(DateFormatUtils.format(tuple.f2, "yyyy-MM-dd HH:mm:ss.SSS")); builder.append("\n"); builder.append("===================\n"); } // list 用完就可以及时销毁了,节省空间 list.clear(); out.collect(builder.toString()); } } }
输入数据:
s1,1,1 s1,2,1 s1,5,2 s1,8,3 s1,9,1 // 第一个窗口范围 [-5,5),但是等待时间+3s所以8s进行输出,但是我们触发器+1ms所以9s才输出 s1,10,1 s1,13,2 s1,14,3 // 同理,第二个窗口范围 [0,10),14s才输出
输出结果:
=================== top1: 1 -> 2 窗口结束时间=1970-01-01 08:00:05.000 =================== =================== top1: 1 -> 3 窗口结束时间=1970-01-01 08:00:10.000 =================== top2: 2 -> 1 窗口结束时间=1970-01-01 08:00:10.000 ===================
2.5、侧输出流(Side Output)
上下文对象 ctx 提供了侧输出流方法 output(OutTag,value) ,或者如果我们是 keyedStream.processElement() 的话,还可以在 .onTimer() 方法中调用上下文的.output()方法就可以了。我们之前用过好多次了。
/** * 案例: 不同水位出现的次数的 topN */ public class SideOutputDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { // System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp); return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING); SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(sensor -> sensor.id) .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() { // KeyedProcessFunction泛型参数类型:key类型、输入类型、主流输出类型 @Override public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception { // 使用侧输出流告警 if (value.vc > 10) { ctx.output(warnTag, "当前水位=" + value.vc + ">阈值10!!!"); } out.collect(value); } }); process.print("main"); process.getSideOutput(warnTag).print("warn"); env.execute(); } }
输入数据:
s1,1,1 s1,2,2 s1,8,8 s1,10,10 s1,12,12
输出结果:
main> WaterSensor{id='s1', ts=1, vc=1} main> WaterSensor{id='s1', ts=2, vc=2} main> WaterSensor{id='s1', ts=8, vc=8} main> WaterSensor{id='s1', ts=10, vc=10} warn> 当前水位=12>阈值10!!! main> WaterSensor{id='s1', ts=12, vc=12}
总结
这一块知识点特别挺多,与前面的窗口知识关联紧密,都必须熟悉掌握,对感兴趣的事并不能算是一种痛苦,享受知识越来越丰富的过程吧。