Flink(十)【处理函数】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十)【处理函数】

Flink(十)【处理函数】(2)https://developer.aliyun.com/article/1532230

2.4.2、使用 KeyedProcessFunction

       上面我们没有按键区分,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为 1,在实际应用中是要尽量避免的,因为如果数据量很大,一个并行度的情况下机器受不了,而且全窗口函数是在最后窗口要关闭(滚动窗口)或者移动(滑动窗口)时才对有窗口内的数据进行计算,所以计算压力可想而知;所以 Flink 官方也并不推荐使用 AllWindowedStream 进行处理。另外,我们在全窗口函数中定义了 HashMap来统计 水位 的出现次数,计算过程是要先收集齐所有数据、然后再逐一遍历更新 HashMap,这显然不够高效。如果我们可以利用增量聚合函数的特性,每来一条数据就更新一次该水位出现的次数,那么到窗口触发计算时只需要做排序输出就可以了。

       所以优化的思路就是,先按照 vc 对数据进行 keyBy 分区,然后开窗进行增量聚合。所以我们先用增量聚合函数 AggregateFunction 对每个 vc 的次数进行统计,然后结合 ProcessWindowFunction 排序输出最终结果。

总结:

  1. 我们首先根据数据的 vc 进行 keyBy 分区,开窗(根据需求开一个滑动窗口,窗口大小10s,滑动步长5s)
  2. 使用聚合函数(aggregate)结合全窗口函数(ProcessWindowFunction),先把每个区的数据(相同 vc)的次数统计出来得到 count,然后使用全窗口函数把返回值封装成 Tuple3 (vc,count,endWindow)的格式,因为我们要根据不同窗口范围进行统计 TopN
  3. 上面最终的结果是一个普通的 DataStream,我们需要再根据窗口范围(上面Tuple3 中的 endWindow字段)进行一个 keyBy 分区,把每个范围的数据放到一起进行统一排序(这里使用 hashMap(key=windowEnd,value=Tuple3<vc,count,windowEnd>) 进行存储,使用 arrayList 进行排序)。
  4. 使用定时器,当 processElement 数据到齐后进行触发计算输出。
/**
 * 案例: 不同水位出现的次数的 topN
 */
public class KeyedProcessFunctionTopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        // todo 思路2: 使用 keyedProcessFunction 实现
 
        KeyedStream<WaterSensor, Integer> keyedStream = sensorDS.keyBy(sensor -> sensor.vc);
 
        SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                // AggregateFunction 3个泛型参数: 输入类型,累加器类型,输出类型
                .aggregate(new AggregateFunction<WaterSensor, Integer, Integer>() {
                               // 累加器初始值
                               @Override
                               public Integer createAccumulator() {
                                   return 0;
                               }
 
                               // 累加过程: 直接+1,毕竟我们都是相同key
                               @Override
                               public Integer add(WaterSensor value, Integer accumulator) {
                                   return accumulator + 1;
                               }
 
                               // 累加结果直接返回
                               @Override
                               public Integer getResult(Integer accumulator) {
                                   return accumulator;
                               }
 
                               @Override
                               public Integer merge(Integer a, Integer b) {
                                   return null;
                               }
                           },      // ProcessWindowFunction的4个泛型参数: 输入类型、输出类型、键类型、窗口类型
                        // 这里由于我们后面要根据输出结果区分数据是来自哪个窗口的,所以使用了Tuple3<vc,count,windowEnd> 带上了结束窗口的标签
                        new ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>() {
                            @Override
                            public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
                                // 迭代器只有一条数据 所以 iterator.next() 就是它的全部数据了
                                Integer count = elements.iterator().next();
                                long windowEnd = context.window().getEnd();
                                out.collect(Tuple3.of(key, count, windowEnd));
                            }
                        });
        /**
         * windowAgg:SingOutputStreamOperator 的聚合结果:
         * vc=1,count=100,windowEnd=10s
         * vc=2,count=70,windowEnd=10s
         * vc=3,count=80,windowEnd=10s
         * 开窗聚合后,就变成了普通的数据流SingOutputStreamOperator(继承自 DataStream),所以我们自己给聚合结果打上了窗口结束的标签(windowEnd)
         */
 
        // 2. 按照窗口结束标签进行 keyBy 保证同一窗口时间范围的数据统一处理,之后再排序
        windowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();
 
        env.execute();
    }
    public static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer,Integer,Long>,String>{
 
        private Map<Long,List<Tuple3<Integer,Integer,Long>>> map;
        private int threshold;
        public TopN(int threshold) {
            this.threshold = threshold;
            map = new HashMap<>();
        }
 
        // Tuple3<Integer, Integer, Long> value : Tuple3的元素类型: vc,count,windowEnd
        @Override
        public void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {
            // 进入这个方法的都只是一条数据,要排序就需要都到齐才行
            // 1. 存到 hashMap
            Long windowEnd = value.f2;
            if (map.containsKey(windowEnd)){
                List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd);
                list.add(value);
            }else {
                List<Tuple3<Integer, Integer, Long>> list = new ArrayList<>();
                list.add(value);
                map.put(windowEnd,list);
            }
            // 注册一个定时器,windowEnd+1ms 触发
            // 因为同一个窗口范围应该同时输出,只不过是一条一条调用processElement方法,1ms就够执行完了
            ctx.timerService().registerEventTimeTimer(windowEnd + 1);
            // 这里 out 不用操作
        }
 
        // 定时器触发逻辑
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 同一个窗口的计算结果攒齐了,需要开启排序和取 top N
            // 1. 排序
            Long windowEnd = ctx.getCurrentKey();
            List<Tuple3<Integer, Integer, Long>> list = map.get(windowEnd);
            list.sort((o1,o2) -> o2.f1-o1.f1);
            // 2. 取 topN
            StringBuilder builder = new StringBuilder();
            builder.append("===================\n");
            // 防止越界,考虑list的size可能不够两个
            for (int i = 0; i < Math.min(list.size(),threshold); i++) {
                Tuple3<Integer, Integer, Long> tuple = list.get(i);
                builder.append("top").append(i+1).append(": ");
                builder.append(tuple.f0).append(" -> ");
                builder.append(tuple.f1).append("\n");
                builder.append("窗口结束时间=").append(DateFormatUtils.format(tuple.f2, "yyyy-MM-dd HH:mm:ss.SSS"));
                builder.append("\n");
                builder.append("===================\n");
            }
            // list 用完就可以及时销毁了,节省空间
            list.clear();
 
            out.collect(builder.toString());
        }
    }
}

输入数据:

s1,1,1
s1,2,1
s1,5,2    
s1,8,3    
s1,9,1    // 第一个窗口范围 [-5,5),但是等待时间+3s所以8s进行输出,但是我们触发器+1ms所以9s才输出
s1,10,1
s1,13,2
s1,14,3    // 同理,第二个窗口范围 [0,10),14s才输出

输出结果:

===================
top1: 1 -> 2
窗口结束时间=1970-01-01 08:00:05.000
===================
 
===================
top1: 1 -> 3
窗口结束时间=1970-01-01 08:00:10.000
===================
top2: 2 -> 1
窗口结束时间=1970-01-01 08:00:10.000
===================

2.5、侧输出流(Side Output)

上下文对象 ctx 提供了侧输出流方法 output(OutTag,value) ,或者如果我们是 keyedStream.processElement() 的话,还可以在 .onTimer() 方法中调用上下文的.output()方法就可以了。我们之前用过好多次了。

/**
 * 案例: 不同水位出现的次数的 topN
 */
public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
//                                System.out.println("数据=" + sensor + ",recordTs=" + recordTimestamp);
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);
 
        SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(sensor -> sensor.id)
                .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
                    // KeyedProcessFunction泛型参数类型:key类型、输入类型、主流输出类型
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                        // 使用侧输出流告警
                        if (value.vc > 10) {
                            ctx.output(warnTag, "当前水位=" + value.vc + ">阈值10!!!");
                        }
                        out.collect(value);
                    }
                });
 
        process.print("main");
        process.getSideOutput(warnTag).print("warn");
 
        env.execute();
    }
}

输入数据:

s1,1,1
s1,2,2
s1,8,8
s1,10,10
s1,12,12

输出结果:

main> WaterSensor{id='s1', ts=1, vc=1}
main> WaterSensor{id='s1', ts=2, vc=2}
main> WaterSensor{id='s1', ts=8, vc=8}
main> WaterSensor{id='s1', ts=10, vc=10}
warn> 当前水位=12>阈值10!!!
main> WaterSensor{id='s1', ts=12, vc=12}

总结

       这一块知识点特别挺多,与前面的窗口知识关联紧密,都必须熟悉掌握,对感兴趣的事并不能算是一种痛苦,享受知识越来越丰富的过程吧。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之开窗函数(WindowFunction)如何做开窗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
传感器 流计算
|
2月前
|
消息中间件 SQL 分布式计算
|
3月前
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL Oracle 关系型数据库
Flink的表值函数
【2月更文挑战第18天】Flink的表值函数
37 3
|
3月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
342 2
|
3月前
|
SQL Oracle 关系型数据库
Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
【2月更文挑战第17天】Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
70 1
|
3月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
109 0
Flink处理函数实战之四:窗口处理