前言
冬天学习成本太高了,每天冻得要死,自习室人满为患,确实是辛苦。学校基本的硬件条件差的一批(图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找),个体无法改变环境只能顺应了(艹,受不了了,去校长信箱轰tnd)。
今天学习 Flink 处理函数,学完这一块就剩状态管理、容错机制和 Flink SQL 了,坚持坚持。学完再好好回顾回顾,最后就是把剩余的一些框架(Kafka、Flume等)补齐了。
1、处理函数
之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于 DataStream 进行转换的;所以可以统称为 DataStream API,这也是 Flink 编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API,DataStream API 只是其中之一,如图:
在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑(我们之前可以从 process 函数中获得 上下文对象 ctx、实现侧输出流等),所以这一层接口就被叫作“处理函数”(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。就相当于我们 Spark 中的 RDD 编程,它是最底层的东西,所以一些上层无法实现的,它都可以实现。
所以,总结一句话就是:只要是现有的算子实现不了的,直接上 process 即可。
1.1、基本处理函数
1.1.1、处理函数的功能和基本使用
我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如 map 算子,我们实现的 MapFunction 中,只能获取到当前的数据,定义它转换之后的形式;而像窗口聚合这样的复杂操作,AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式)。另外我们还介绍过富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
但是无论哪种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。在定义生成规则之后,水位线会源源不断地产生,像数据一样在任务间流动,可我们却不能像数据一样去处理它,因为跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。
这就需要我们使用——处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑(我们之前通过给 process 方法传入一个实现了 ProcessFunction 抽象类的匿名内部类来实现侧输出流、通过给 process 方法传入一个实现了 CoProcessFunction 抽象类的匿名内部类来实现合流 )。
stream.process(new MyProcessFunction());
这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
1.1.2、ProcessFunction 解析
在源码中我们可以看到,抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @PublicEvolving public abstract class ProcessFunction<I, O> extends AbstractRichFunction { // ... // 核心处理逻辑 public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; // 定时器 public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} // ... }
1. 抽象方法.processElement()
用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。
- value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
- ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
Context 抽象类定义如下:
public abstract class Context { public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> outputTag, X value); }
- out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
通过几个参数的分析不难发现,ProcessFunction 可以轻松实现 flatMap 这样的基本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。自定义状态的具体实现,我们会在后面学到 “状态管理” 的时候再说。
2. 非抽象方法.onTimer()
用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。
打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。
与.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。
既然有.onTimer()方法做定时触发,我们用 ProcessFunction 也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。我们也可以看到,处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用是.processElement()方法,而处理水位线事件调用的是.onTimer()。
这里需要注意的是,上面的.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数,它们之间还是有一些微小的区别的。接下来我们就介绍一下处理函数的分类。
1.1.3、处理函数的分类
1. 处理函数的功能和使用
Flink 中的处理函数其实是一个大家族,ProcessFunction 只是其中一员。我们知道,DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用 .keyBy() 之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。Flink 提供了 8 个不同的处理函数:
- ProcessFunction。最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
- KeyedProcessFunction。对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream。
- ProcessWindowFunction。开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process() 时作为参数传入。
- ProcessAllWindowFunction。同样是开窗之后的处理函数(没有 keyby 的话传这个),基于 AllWindowedStream 调用.process()时作为参数传入。
- CoProcessFunction。合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
- ProcessJoinFunction。间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
- BroadcastProcessFunction。广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
- KeyedBroadcastProcessFunction。按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。
接下来,我们就对经常用到的 KeyedProcessFunction 和 ProcessWindowFunction 的具体用法展开详细说明。
2.1、按键分区处理函数(KeyedProcessFunction)
在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy 算子对数据流进行“按键分区”,得到一个 KeyedStream。也就是指定一个键(key),按照它的哈希值(hash code)将数据分成不同的“组”,然后分配到不同的并行子任务上执行计算;这相当于做了一个逻辑分流的操作,从而可以充分利用并行计算的优势实时处理海量数据。
另外我们在上节中也提到,只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction,最基本的 ProcessFunction 反而使用率没那么高。接下来我们就先从定时服务(TimerService)入手,详细讲解 KeyedProcessFunction 的用法
2.1.1、定时器(Timer)和定时服务(TimerService)
KeyedProcessFunction 的一个特色,就是可以灵活地使用定时器。定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。
定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一个 TimerService 对象。TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
public abstract TimerService timerService(); TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法: // 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线(事件时间) long currentWatermark(); // 注册处理时间定时器,当处理时间超过 time 时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器,当水位线超过 time 时触发 void registerEventTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteEventTimeTimer(long time);
六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。
需要注意:尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间。
对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳(timestamp)保存起来,排队等待执行。可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个 key 注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。
基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的。这样,我们的代码并不需要做额外的处理,底层就可以直接对不同key 进行独立的处理操作了。
利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次。
1. 事件时间定时器
我们通过 Socket 接收一个无序的数据流(WaterSensor类型),并指定允许迟到 3s,然后进行一个 keyBy ,之后得到 KeyedStream,我们给它定义一个定时器(当事件时间进展到 5s 的时候触发一次):
public class KeyedProcessTimerDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> { // System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp); return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo Process:keyed SingleOutputStreamOperator<String> process = sensorKs.process( /** * KeyedProcessFunction<K, T, R> * K: key 的类型 * T: data 的类型 * R: return 的类型 */ new KeyedProcessFunction<String, WaterSensor, String>() { /** * 来一条数据调用一次这个方法 * @param value 数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 从数据中提取出来的时间,如果没有则返回 null Long currentEventTime = ctx.timestamp(); // 定时器 TimerService timerService = ctx.timerService(); // 注册定时器 - 事件时间 timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器) System.out.println("当前的 key="+ctx.getCurrentKey()+"当前时间为 " + currentEventTime + ",注册了一个5s的定时器"); // 注册定时器 - 处理时间 // timerService.registerProcessingTimeTimer(); // 删除定时器 - 事件时间 // timerService.deleteEventTimeTimer(); // 删除定时器 - 处理时间 // timerService.deleteProcessingTimeTimer(); // 获取当前的处理时间 - 系统时间 long currentPs = timerService.currentProcessingTime(); // 获取当前水位线 long watermark = timerService.currentWatermark(); } /** * 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!! * @param timestamp 当前时间进展 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发"); } } ); process.print(); env.execute(); } }
输入数据:
s1,1,1 s1,3,3 s1,5,5 s1,8,8 s1,9,9
输出数据:
当前key=s1,当前时间为 1000,注册了一个5s的定时器 当前key=s1,当前时间为 3000,注册了一个5s的定时器 当前key=s1,当前时间为 5000,注册了一个5s的定时器 当前key=s1,当前时间为 8000,注册了一个5s的定时器 当前key=s1,当前时间为 9000,注册了一个5s的定时器 当前的 key= s1现在时间为 5000定时器触发
首先可以看到,每来一条数据都会注册一个定时器;我们还可以发现,当数据进展到 8s 的时候,按说我们设置的最多等待 3s ,而这里 8-3=5 按说应该达到触发条件了,可是却没有触发。其实对于触发器来说,它这个时候的时间其实是 (8s-3s-1ms=4999ms),其实离触发还差 1ms,所以当我们数据的事件时间为 9s 的时候,就会发现定时器终于被触发了。
注意:事件事件语义下,对于同一个 key 定时器只触发一次!!!对于相同 key 的数据,Flink 会根据 key 对定时器进行去重。
Flink(十)【处理函数】(2)https://developer.aliyun.com/article/1532230