Flink(十)【处理函数】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十)【处理函数】

前言

       冬天学习成本太高了,每天冻得要死,自习室人满为患,确实是辛苦。学校基本的硬件条件差的一批(图书馆贼小贼偏僻、老教室暖气还没有地板热、空教室还得自己一个一个挨着找),个体无法改变环境只能顺应了(艹,受不了了,去校长信箱轰tnd)。

       今天学习 Flink 处理函数,学完这一块就剩状态管理、容错机制和 Flink SQL 了,坚持坚持。学完再好好回顾回顾,最后就是把剩余的一些框架(Kafka、Flume等)补齐了。

1、处理函数

       之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于 DataStream 进行转换的;所以可以统称为 DataStream API,这也是 Flink 编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API,DataStream API 只是其中之一,如图:

       在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑(我们之前可以从 process 函数中获得 上下文对象 ctx、实现侧输出流等),所以这一层接口就被叫作“处理函数”(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。就相当于我们 Spark 中的 RDD 编程,它是最底层的东西,所以一些上层无法实现的,它都可以实现。

       所以,总结一句话就是:只要是现有的算子实现不了的,直接上 process 即可。

1.1、基本处理函数

1.1.1、处理函数的功能和基本使用

       我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如 map 算子,我们实现的 MapFunction 中,只能获取到当前的数据,定义它转换之后的形式;而像窗口聚合这样的复杂操作,AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式)。另外我们还介绍过富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。

       但是无论哪种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。在定义生成规则之后,水位线会源源不断地产生,像数据一样在任务间流动,可我们却不能像数据一样去处理它,因为跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

       这就需要我们使用——处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

       处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑(我们之前通过给 process 方法传入一个实现了 ProcessFunction 抽象类的匿名内部类来实现侧输出流、通过给 process 方法传入一个实现了 CoProcessFunction 抽象类的匿名内部类来实现合流 )。

stream.process(new MyProcessFunction());

       这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

1.1.2、ProcessFunction 解析

       在源码中我们可以看到,抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。

       内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

package org.apache.flink.streaming.api.functions;
 
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
 
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
 
    // ...
   
    // 核心处理逻辑
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
 
    // 定时器
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
 
    // ...
}
1. 抽象方法.processElement()

        用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。

  • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。

Context 抽象类定义如下:

    public abstract class Context {
 
        
        public abstract Long timestamp();
 
        
        public abstract TimerService timerService();
 
        
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
  • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

通过几个参数的分析不难发现,ProcessFunction 可以轻松实现 flatMap 这样的基本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。自定义状态的具体实现,我们会在后面学到 “状态管理” 的时候再说。

2. 非抽象方法.onTimer()

       用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。

       打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。        

       与.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

       既然有.onTimer()方法做定时触发,我们用 ProcessFunction 也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说 ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。我们也可以看到,处理函数都是基于事件触发的。水位线就如同插入流中的一条数据一样;只不过处理真正的数据事件调用是.processElement()方法,而处理水位线事件调用的是.onTimer()。

       这里需要注意的是,上面的.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文 ctx 中的定时服务。在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数,它们之间还是有一些微小的区别的。接下来我们就介绍一下处理函数的分类。

1.1.3、处理函数的分类

1. 处理函数的功能和使用

       Flink 中的处理函数其实是一个大家族,ProcessFunction 只是其中一员。我们知道,DataStream 在调用一些转换方法之后,有可能生成新的流类型;例如调用 .keyBy() 之后得到 KeyedStream,进而再调用.window()之后得到 WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层 API,可彼此之间也会有所差异。Flink 提供了 8 个不同的处理函数:

  1. ProcessFunction。最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
  2. KeyedProcessFunction。对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream。
  3. ProcessWindowFunction。开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process() 时作为参数传入。
  4. ProcessAllWindowFunction。同样是开窗之后的处理函数(没有 keyby 的话传这个),基于 AllWindowedStream 调用.process()时作为参数传入。
  5. CoProcessFunction。合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
  6. ProcessJoinFunction。间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
  7. BroadcastProcessFunction。广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
  8. KeyedBroadcastProcessFunction。按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

接下来,我们就对经常用到的 KeyedProcessFunction 和 ProcessWindowFunction 的具体用法展开详细说明。

2.1、按键分区处理函数(KeyedProcessFunction)

       在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy 算子对数据流进行“按键分区”,得到一个 KeyedStream。也就是指定一个键(key),按照它的哈希值(hash code)将数据分成不同的“组”,然后分配到不同的并行子任务上执行计算;这相当于做了一个逻辑分流的操作,从而可以充分利用并行计算的优势实时处理海量数据。

       另外我们在上节中也提到,只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy 分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction,最基本的 ProcessFunction 反而使用率没那么高。接下来我们就先从定时服务(TimerService)入手,详细讲解 KeyedProcessFunction 的用法

2.1.1、定时器(Timer)和定时服务(TimerService)

       KeyedProcessFunction 的一个特色,就是可以灵活地使用定时器。定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。

       定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一个 TimerService 对象。TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:

public abstract TimerService timerService();
TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线(事件时间)
long currentWatermark();
// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);

       六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。

       需要注意:尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间

       对于处理时间和事件时间这两种类型的定时器,TimerService 内部会用一个优先队列将它们的时间戳(timestamp)保存起来,排队等待执行。可以认为,定时器其实是 KeyedStream上处理算子的一个状态,它以时间戳作为区分。所以 TimerService 会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个 key 注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。

       基于 KeyedStream 注册定时器时,会传入一个定时器触发的时间戳,这个时间戳的定时器对于每个 key 都是有效的。这样,我们的代码并不需要做额外的处理,底层就可以直接对不同key 进行独立的处理操作了。

       利用这个特性,有时我们可以故意降低时间戳的精度,来减少定时器的数量,从而提高处理性能。比如我们可以在设置定时器时只保留整秒数,那么定时器的触发频率就是最多 1 秒一次。

1. 事件时间定时器

我们通过 Socket 接收一个无序的数据流(WaterSensor类型),并指定允许迟到 3s,然后进行一个 keyBy ,之后得到 KeyedStream,我们给它定义一个定时器(当事件时间进展到 5s 的时候触发一次):

public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);
                                return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                            }));
 
        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);
 
        // todo Process:keyed
        SingleOutputStreamOperator<String> process = sensorKs.process(
                /**
                 * KeyedProcessFunction<K, T, R>
                 * K: key 的类型
                 * T: data 的类型
                 * R: return 的类型
                 */
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次这个方法
                     * @param value 数据
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // 从数据中提取出来的时间,如果没有则返回 null
                        Long currentEventTime = ctx.timestamp();
                        // 定时器
                        TimerService timerService = ctx.timerService();
                        // 注册定时器 - 事件时间
                        timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器)
                        System.out.println("当前的 key="+ctx.getCurrentKey()+"当前时间为 " + currentEventTime + ",注册了一个5s的定时器");
                        // 注册定时器 - 处理时间
//                        timerService.registerProcessingTimeTimer();
                        // 删除定时器 - 事件时间
//                        timerService.deleteEventTimeTimer();
                        // 删除定时器 - 处理时间
//                        timerService.deleteProcessingTimeTimer();
 
                        // 获取当前的处理时间 - 系统时间
                        long currentPs = timerService.currentProcessingTime();
                        // 获取当前水位线
                        long watermark = timerService.currentWatermark();
                    }
 
                    /**
                     * 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!
                     * @param timestamp 当前时间进展
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发");
                    }
                }
        );
 
        process.print();
 
        env.execute();
    }
}

输入数据:

s1,1,1
s1,3,3
s1,5,5
s1,8,8
s1,9,9

输出数据:

当前key=s1,当前时间为 1000,注册了一个5s的定时器
当前key=s1,当前时间为 3000,注册了一个5s的定时器
当前key=s1,当前时间为 5000,注册了一个5s的定时器
当前key=s1,当前时间为 8000,注册了一个5s的定时器
当前key=s1,当前时间为 9000,注册了一个5s的定时器
当前的 key= s1现在时间为 5000定时器触发

       首先可以看到,每来一条数据都会注册一个定时器;我们还可以发现,当数据进展到 8s 的时候,按说我们设置的最多等待 3s ,而这里 8-3=5 按说应该达到触发条件了,可是却没有触发。其实对于触发器来说,它这个时候的时间其实是 (8s-3s-1ms=4999ms),其实离触发还差 1ms,所以当我们数据的事件时间为 9s 的时候,就会发现定时器终于被触发了。

注意:事件事件语义下,对于同一个 key 定时器只触发一次!!!对于相同 key 的数据,Flink 会根据 key 对定时器进行去重。

Flink(十)【处理函数】(2)https://developer.aliyun.com/article/1532230

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
71 27
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
41 0
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之开窗函数(WindowFunction)如何做开窗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 流计算
|
6月前
|
传感器 流计算
|
7月前
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL Oracle 关系型数据库
Flink的表值函数
【2月更文挑战第18天】Flink的表值函数
61 3
|
7月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
554 2
|
7月前
|
SQL Oracle 关系型数据库
Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
【2月更文挑战第17天】Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
147 1