Flink之ProcessFunction

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应 用场景下,极为重要。例如 MapFunction 这样的map 转换算子就无法访问时间戳或者当前事 件的事件时间。基于此,DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、 watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的 window 函数和 转换算子无法实现)。例如,Flink SQL 就是使用Process Function 实现的。

Flink Process Function


15.png


ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:


事件 (数据流元素)

状态 (容错和一致性)

定时器 (事件时间和处理时间)

ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。每在输入流中接收到一个事件,就会调用来此函数来处理。


对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState,类似于其他有状态函数访问 KeyedState。


定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实例注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。在调用期间,所有状态再次限定为定时器创建的键,允许定时器操作 KeyedState。


如果要访问 KeyedState 和定时器,那必须在 KeyedStream 上使用 ProcessFunction。

stream.keyBy(...).process(new MyProcessFunction())

针对不同得流Flink提供了8个Process Function


ProcessFunction :最原始,自定义程度高,什么都能做

KeyedProcessFunction:keyby后使用得process中传入得Process Function

CoProcessFunction:connect后使用得process中传入得Process Function

ProcessJoinFunction:两条流Join连接后使用得process中传入得Process Function

BroadcastProcessFunction:广播流使用得process中传入得Process Function

KeyedBroadcastProcessFunction:keyby广播流使用得process中传入得Process Function

ProcessWindowFunction:开窗后使用得process中传入得Process Function

ProcessAllWindowFunction:AllWindow后使用得process中传入得Process Function


Process Function


processElement(v: IN, ctx: Context, out:Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(sideoutputs)。

onTimer(timestamp: Long, ctx: OnTimerContext, out:Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector

为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

简单使用proceess实现测输出流的功能

package com.aikfk.flink.datastream.processfunction;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/23 3:04 下午
 */
public class ProcessSideOutPut {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(data -> {
                    String[] split = data.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                });
        //3.使用ProcessFunction将数据分流
        SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, Context context, Collector<WaterSensor> collector) throws Exception {
                //取出水位线
                Integer vc = value.getVc();
                //根据水位线高低,分流
                if (vc >= 30) {
                    //将数据输出至主流
                    collector.collect(value);
                } else {
                    //将数据输出至侧输出流
                    context.output(new OutputTag<Tuple2<String, Integer>>("SideOut") {
                               },
                            new Tuple2<>(value.getId(), vc));
                }
            }
        });
        //4.打印数据
        result.print("主流");
        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(new OutputTag<Tuple2<String, Integer>>("SideOut") {
        });
        sideOutput.print("Side");
        //5.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844002,1
ws_001,1577844002,1
ws_001,1577844002,1
ws_001,1577844002,40
ws_001,1577844002,45

运行结果:

Side> (ws_001,1)
Side> (ws_001,1)
Side> (ws_001,1)
主流> WaterSensor{id='ws_001', ts=1577844002, vc=40}
主流> WaterSensor{id='ws_001', ts=1577844002, vc=45}

简单使用proceess实现定时器的功能

基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后 指定的时间执行。

package com.aikfk.flink.datastream.processfunction;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/23 3:04 下午
 */
public class ProcessOnTimer {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(data -> {
                    String[] split = data.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                });
        //3.使用ProcessFunction的定时器功能
        SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.keyBy(WaterSensor::getId).process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, Context context, Collector<WaterSensor> collector) throws Exception {
                //获取当前数据的处理时间
                long ts = context.timerService().currentProcessingTime();
                System.out.println(ts);
                //注册定时器,当前数据的处理时间 + 5秒
                context.timerService().registerProcessingTimeTimer(ts + 5000L);
                //输出数据
                collector.collect(value);
            }
           //注册的定时器响起,触发动作
           @Override
           public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception {
               System.out.println("定时器触发:" + timestamp);
           }
        });
        //4.打印数据
        result.print();
        //5.执行任务
        env.execute();
    }
}


运行结果:

1616735657033
WaterSensor{id='ws_001', ts=1577844001, vc=45}
定时器触发:1616735662033
1616735662981
WaterSensor{id='ws_001', ts=157784400, vc=67}
定时器触发:1616735667981

Process Function项目案例


对于每一个接入的数据元素:

更新数据状态

注册未来某一时间需要调用的callback回调函数

当某一时间到来后:

检查条件是否满足,并执行对应的行为,例如输出数据元素等

功能需求:


记录每个传入的Key的counts数量

如果指定的Key在最近100ms (Event Time)没有接收到任何 Element,则输出key/ count键值对。

大致思路:


存储count值,key以及最后更新的TimeStamp到 ValueState 中, ValueState 由 key隐含定义;

对于每条记录:

更新计数器并修改最后的时间戳

注册一个100ms timer计时器,起始时间从当前的EventTime开始

Times被回调时:

检查存储计数的最后修改时间与回调的事件时间TimeStamp

如果匹配则发送键/计数键值对(即在100ms内没有更新)

开发代码:

package com.aikfk.flink.base;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public  class MySource implements SourceFunction<String> {
    @Override
    public void cancel() {
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String[] datas = {
                "a,1575159390000",
                "a,1575159402000",
                "b,1575159427000",
                "c,1575159382000",
                "b,1575159407000",
                "a,1575159302000"
        };
        for (int k = 0; k < datas.length; k++) {
            Thread.sleep(100);
            ctx.collect(datas[k]);
        }
    }
}
package com.aikfk.flink.datastream.processfunction;
import com.aikfk.flink.base.MySource;
import com.aikfk.flink.base.Tools;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/26 1:32 下午
 */
public class ProcessFunction {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.生成dataStream1,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        })
        // 3.生成watermark
        .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String,Long> input, long l) {
                                return input.f1;
                            }
                        }))
        // 4.keyby
        .keyBy(key -> key.f0)
        // 实现processfunction方法
        .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
            private ValueState<CountWithTimestamp> state;
            @Override
            public void open(Configuration parameters) throws Exception {
                state = getRuntimeContext().getState(
                        new ValueStateDescriptor<CountWithTimestamp>("mystate",CountWithTimestamp.class));
            }
            @Override
            public void processElement(Tuple2<String, Long> value,
                                       Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
                CountWithTimestamp currentElement = state.value();
                if (currentElement == null){
                    currentElement = new CountWithTimestamp();
                    currentElement.key = value.f0;
                }
                // 对key进行累加
                currentElement.count ++;
                currentElement.lastModified = context.timestamp();
                state.update(currentElement);
                // 注册定时器
                context.timerService().registerEventTimeTimer(currentElement.lastModified + 1000);
            }
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                CountWithTimestamp result = state.value();
                System.out.println(ctx.getCurrentKey()+"   timestamp : "+ Tools.getMsToDate(timestamp) +
                        " ctx.timestamp :"+ Tools.getMsToDate(ctx.timestamp())+
                        " lastModified:"+Tools.getMsToDate(result.lastModified));
                if ((result.lastModified + 1000) == timestamp){
                    out.collect(new Tuple2<>(result.key, result.count));
                }
            }
        });
        dataStream.print();
        env.execute("Window WordCount");
    }
    private static class CountWithTimestamp{
        private String key;
        private long count;
        private long lastModified;
    }
}

运行结果:

a   timestamp : 2019-12-01 08:15:03.000 ctx.timestamp :2019-12-01 08:15:03.000 lastModified:2019-12-01 08:15:02.000
(a,3)
c   timestamp : 2019-12-01 08:16:23.000 ctx.timestamp :2019-12-01 08:16:23.000 lastModified:2019-12-01 08:16:22.000
(c,1)
a   timestamp : 2019-12-01 08:16:31.000 ctx.timestamp :2019-12-01 08:16:31.000 lastModified:2019-12-01 08:15:02.000
a   timestamp : 2019-12-01 08:16:43.000 ctx.timestamp :2019-12-01 08:16:43.000 lastModified:2019-12-01 08:15:02.000
b   timestamp : 2019-12-01 08:16:48.000 ctx.timestamp :2019-12-01 08:16:48.000 lastModified:2019-12-01 08:16:47.000
(b,2)
b   timestamp : 2019-12-01 08:17:08.000 ctx.timestamp :2019-12-01 08:17:08.000 lastModified:2019-12-01 08:16:47.000




相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
并行计算 API 流计算
Flink之处理函数 (ProcessFunction)1
Flink之处理函数 (ProcessFunction)
568 0
|
Java 程序员 网络安全
|
消息中间件 存储 程序员
|
存储 缓存 API
Flink之处理函数 (ProcessFunction)2
Flink之处理函数 (ProcessFunction)
458 0
|
存储 缓存 分布式计算
Flink / Scala - ProcessFunction 之间共用缓存测试
Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式: A.ValueState 缓存 B.HashMapCache 缓存 C.RedisCahce 缓存...
413 0
Flink / Scala - ProcessFunction 之间共用缓存测试
|
Java 程序员 网络安全
|
消息中间件 存储 程序员
|
传感器 SQL 监控
【Flink】(六)ProcessFunction API(底层 API)
【Flink】(六)ProcessFunction API(底层 API)
315 0
|
12月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。