尚硅谷学习笔记
6.5 窗口函数
增量聚合函数(ReduceFunction / AggregateFunction)
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
全窗口函数(full window functions)
我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
1)窗口函数(WindowFunction)
WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。
stream .keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。
不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。
2)处理窗口函数(ProcessWindowFunction)
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
代码实现如下:
public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // 1. 窗口分配器 WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> process = sensorWS .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long count = elements.spliterator().estimateSize(); long windowStartTs = context.window().getStart(); long windowEndTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS"); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ); process.print(); env.execute(); } }
增量聚合和全窗口函数的结合使用
public class WindowAggregateAndProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // 1. 窗口分配器 WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 2. 窗口函数: /** * 增量聚合 Aggregate + 全窗口 process * 1、增量聚合函数处理数据: 来一条计算一条 * 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数 * 3、经过全窗口函数的处理包装后,输出 * * 结合两者的优点: * 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少 * 2、全窗口函数: 可以通过 上下文 实现灵活的功能 */ // sensorWS.reduce() //也可以传两个 SingleOutputStreamOperator<String> result = sensorWS.aggregate( new MyAgg(), new MyProcess() ); result.print(); env.execute(); } public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{ @Override public Integer createAccumulator() { System.out.println("创建累加器"); return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value="+value); return accumulator + value.getVc(); } @Override public String getResult(Integer accumulator) { System.out.println("调用getResult方法"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法"); return null; } } // 全窗口函数的输入类型 = 增量聚合函数的输出类型 public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{ @Override public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } }
** 6.6 其他API**
触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...) .window(...) .trigger(new MyTrigger())
移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...) .window(...) .evictor(new MyEvictor())