Flink(十)【处理函数】(2)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十)【处理函数】

Flink(十)【处理函数】(1)https://developer.aliyun.com/article/1532227

2. 处理时间定时器

和上面一样,既然是处理时间的话,我们数据中带的事件时间就没用了,这里我们给每个来的数据定义一个五秒后的定时器:

public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);
                                return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                            }));
 
        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);
 
        // todo Process:keyed
        SingleOutputStreamOperator<String> process = sensorKs.process(
                /**
                 * KeyedProcessFunction<K, T, R>
                 * K: key 的类型
                 * T: data 的类型
                 * R: return 的类型
                 */
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次这个方法
                     * @param value 数据
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
 
                        // todo 1.获取定时器并注册
                        TimerService timerService = ctx.timerService();
 
                        // todo 1.1 注册事件时间定时器
                        // 事件时间 也就是当前数据中的 watermark,如果没有则返回 null
//                        Long currentEventTime = ctx.timestamp();
                        // 注册定时器 - 事件时间
//                        timerService.registerEventTimeTimer(5000L); // 事件时间进展到 5s 时触发闹钟(定时器)
//                        System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentEventTime + ",注册了一个5s的定时器");
 
                        // todo 1.2 注册处理时间定时器
                        // 处理时间 也就是当前的处理时间 - 系统时间
                        long currentPs = timerService.currentProcessingTime();
                        // 注册定时器 - 处理时间
                        timerService.registerProcessingTimeTimer(currentPs+5000L);  // 当处理时间为 当前时间+5s 触发闹钟
                        System.out.println("当前key="+ctx.getCurrentKey()+",当前时间为 " + currentPs + ",注册了一个5s后的定时器");
 
                        // 删除定时器 - 事件时间
//                        timerService.deleteEventTimeTimer();
                        // 删除定时器 - 处理时间
//                        timerService.deleteProcessingTimeTimer();
 
                        // 获取当前水位线
                        long watermark = timerService.currentWatermark();
                    }
 
                    // todo 2.定义触发定时器逻辑
                    /**
                     * 定义定时器(闹钟)触发时的响应逻辑,对于同一个key,onTimer只会被触发一次!!
                     * @param timestamp 当前时间进展
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        System.out.println("当前的 key= "+ctx.getCurrentKey()+"现在时间为 " + timestamp + "定时器触发");
                    }
                }
        );
 
        process.print();
 
        env.execute();
    }
}

测试输入:

s1,1,1
s1,2,2
s1,3,3

输出:

当前key=s1,当前时间为 1703043149860,注册了一个5s后的定时器
当前key=s1,当前时间为 1703043151317,注册了一个5s后的定时器
当前key=s1,当前时间为 1703043152807,注册了一个5s后的定时器
当前的 key= s1现在时间为 1703043154860定时器触发
当前的 key= s1现在时间为 1703043156317定时器触发
当前的 key= s1现在时间为 1703043157807定时器触发

可以看到,处理时间语义下,对于同一个 key 它有可能会触发多次。

3. watermark 的滞后性
 @Override
    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
 
   // todo 1.获取定时器并注册
   TimerService timerService = ctx.timerService();
   // 获取当前水位线
   long watermark = timerService.currentWatermark();
   System.out.println("当前数据="+value+",当前watermark="+watermark);
}

输入:

s1,1,1
s1,5,5
s1,9,9

输出:

当前数据=WaterSensor{id='s1', ts=1, vc=1},当前watermark=-9223372036854775808
当前数据=WaterSensor{id='s1', ts=5, vc=5},当前watermark=-2001
当前数据=WaterSensor{id='s1', ts=9, vc=9},当前watermark=1999

可以看到,当我们的数据 {s1,1,1} 到达后,watermark 并不是 1-3-1ms = -2001 而是 watermark 的初始值 Inerger.MIN_VALUE,这是因为我们的水位线总是插入到数据后面的,而 processElement 方法一次只能处理一个数据,所以当数据  {s1,1,1}  处理完毕之后 watermark=-2001 才会进入 processElement 方法并更新 watermark。

定时器 - 总结
  1. 只有 KeyedStream 才有定时器
  2. 事件时间定时器,通过数据的 watermark 来触发
  1. 注意:watermark = 当前最大事件时间 - 最大等待时间 -1ms
  1. 在 processElement 中获取到的 watermark 是上一次的 watermark ,因为 watermark 是在数据后面进入 processElement 方法的。

2.3、窗口处理函数

       除了 KeyedProcessFunction , 另外一大类常用的处 理 函 数 ,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction 了。

2.3.1、窗口处理函数的使用

       进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数(apply/process)、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

       窗口处理函数 ProcessWindowFunction 的使用与其他窗口函数类似 ,也是基于WindowedStream 直接调用方法就可以,只不过这时调用的是 .process()。就像我们之前窗口那一章节写的全窗口函数:

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());
 
        KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId);
 
        // todo 1. 指定窗口分配器:基于处理时间的滚动窗口
        WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
 
        // todo 2. 指定窗口函数:全窗口函数
        SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            /**
             *
             * @param key 分组的 key
             * @param context 上下文
             * @param elements 全窗口存的数据
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String start = sdf.format(new Date(startTs));
                String end = sdf.format(new Date(endTs));
                long size = elements.spliterator().estimateSize();
                out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString());
            }
        });
 
        process.print();
 
        env.execute();
    }
}

2.3.2、ProcessWindowFnction 解析

ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    public void clear(Context context) throws Exception {}
    public abstract class Context implements java.io.Serializable {...}
}

ProcessWindowFunction 依然是一个继承了 AbstractRichFunction 的抽象类,它有四个类型参数:

  • IN:input,数据流中窗口任务的输入数据类型。
  • OUT:output,窗口任务进行计算之后的输出数据类型。
  • KEY:数据中键 key 的类型。
  • W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口,W就是 TimeWindow。而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。

因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改了.process()。方法包含四个参数。

  • key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段。
  • context:当前窗口进行计算的上下文,它的类型就是 ProcessWindowFunction内部定义的抽象类 Context。
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
  • out:用来发送数据输出计算结果的收集器,类型为 Collector。

可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合(一个迭代器对象)。而上下文context 所包含的内容也跟其他处理函数有所差别:

public abstract class Context implements java.io.Serializable {
 public abstract W window();
 public abstract long currentProcessingTime();
 public abstract long currentWatermark();
 public abstract KeyedStateStore windowState();
 public abstract KeyedStateStore globalState();
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化:

  • 这里不再持有TimerService 对象,只能通过 currentProcessingTime()和 currentWatermark()来获取当前时间,所以失去了设置定时器的功能;
  • 另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。

与此同时,也增加了一些获取其他信息的方法:

  • 可以通过.window()直接获取到当前的窗口对象,
  • 也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。

注意:这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前 key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前 key 的所有窗口有效。所以我们会发现,ProcessWindowFunction 中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。

这里有一个问题:没有了定时器,那窗口处理函数就失去了一个最给力的武器,如果我们希望有一些定时操作又该怎么做呢?其实仔细思考会发现,对于窗口而言,它本身的定义就包含了一个触发计算的时间点,其实一般情况下是没有必要再去做定时操作的。如果非要这么干,Flink也提供了另外的途径——使用窗口触发器(Trigger)。在触发器中也有一个TriggerContext,它可以起到类似 TimerService 的作用:获取当前时间、注册和删除定时器,另外还可以获取当前的状态。这样设计无疑会让处理流程更加清晰——定时操作也是一种“触发”,所以我们就让所有的触发操作归触发器管,而所有处理数据的操作则归窗口函数管。

至于另一种窗口处理函数 ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是 AllWindowedStream,相当于对没有 keyBy 的数据流直接开窗并调用.process()方法,但如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口:

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction());

2.4、应用案例 - Top N

       案例需求:实时统计一段时间内出现次数最多的水位。例如:统计最近10s内出现最多的两个水位,并且每5s更新一次。我们知道,这可以通过一个滑动窗口来实现,于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。其实这就是著名的“Top N”问题。

2.4.1、使用 ProcessAllWindowFunction

我们的数据类型 WaterSenor 的三个属性(id:传感器id,ts:事件时间,vc:水位高度)

/**
 * 案例: 不同水位出现的次数的 topN
 */
public class TopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp)-> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);
                                return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                            }));
 
        // todo 思路1: 不做 keyBy 直接使用一个 hashMap<vc,count> 来累加 统一vc的count值
        // 窗口大小:10s,步长:5s
 
        sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .process(
                        new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() {
                            @Override
                            public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                // 定义一个hashMap
                                Map<Integer,Integer> map = new HashMap<>();
                                for (WaterSensor waterSensor : elements) {
                                    int vc = waterSensor.vc;
                                    map.put(vc,map.getOrDefault(vc,0)+1);
                                }
                                // 排序输出top2,利用 list 对map根据value进行排序
                                List<Tuple2<Integer, Integer>> list = new ArrayList<>();
                                for (Integer vc : map.keySet()) {
                                    list.add(Tuple2.of(vc,map.get(vc)));
                                }
                                list.sort(new Comparator<Tuple2<Integer, Integer>>() {
                                    @Override
                                    public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
                                        return o2.f1-o1.f1;
                                    }
                                });
                                StringBuilder builder = new StringBuilder();
                                builder.append("===================\n");
                                // 防止越界,考虑list的size可能不够两个
                                for (int i = 0; i < Math.min(list.size(),2); i++) {
                                    Tuple2<Integer, Integer> tuple = list.get(i);
                                    builder.append("top").append(i+1).append(": ");
                                    builder.append(tuple.f0).append(" -> ");
                                    builder.append(tuple.f1).append("\n");
                                }
                                builder.append("窗口结束时间=").append(DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS"));
                                builder.append("\n");
                                out.collect(builder.toString());
                            }
                        }
                ).print();
 
        env.execute();
    }
 
}

注意:滑动窗口一定有第一个步长时被触发!到达第 1 个步长触发第 1 个窗口,到达第 2 个步长触发第 2 个窗口

上面我们定义了一个大小为 10 ,滑动步长为 5 的窗口,并且等待时间为 3

注意:

  1. 等待时间内来的数据如果不在窗口范围内并不会计入到窗口中!
  2. 窗口区间是左闭右开的!

所以,这里的窗口:

  • 第一个窗口:[-5,5)(注意:第一个窗口并不是[0,10)!!!)
  • 第一个窗口:[0,10)
  • ...

测试输入数据:

s1,1,1
s2,2,1
s3,3,2
s4,4,2
s5,5,2    // 窗口范围是左闭右开的,这里的 2 并不计数,这里达到第一个滑动步长,所以要等待3s
s6,6,1
s7,7,3
s8,8,3    // 此时才触发第一次计算
s10,10,2
s12,12,1
s13,13,4  // 达到第二个滑动步长,再次触发计算

输出结果:

===================
// [-5,5)的结果
top1: 1 -> 2
top2: 2 -> 2
窗口结束时间=1970-01-01 08:00:05.000
 
===================
// [0,10)的结果
top1: 1 -> 3
top2: 2 -> 3
窗口结束时间=1970-01-01 08:00:10.000

Flink(十)【处理函数】(3)https://developer.aliyun.com/article/1532231


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
67 27
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
41 0
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之开窗函数(WindowFunction)如何做开窗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 流计算
|
6月前
|
消息中间件 SQL 分布式计算
|
7月前
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL Oracle 关系型数据库
Flink的表值函数
【2月更文挑战第18天】Flink的表值函数
61 3
|
7月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
551 2
|
7月前
|
SQL Oracle 关系型数据库
Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
【2月更文挑战第17天】Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
146 1