Flink Process Function
ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:
事件 (数据流元素)
状态 (容错和一致性)
定时器 (事件时间和处理时间)
ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。每在输入流中接收到一个事件,就会调用来此函数来处理。
对于容错的状态,ProcessFunction 可以通过 RuntimeContext 访问 KeyedState,类似于其他有状态函数访问 KeyedState。
定时器可以对处理时间和事件时间的变化做一些处理。每次调用 processElement() 都可以获得一个 Context 对象,通过该对象可以访问元素的事件时间戳以及 TimerService。TimerService 可以为尚未发生的事件时间/处理时间实例注册回调。当定时器到达某个时刻时,会调用 onTimer() 方法。在调用期间,所有状态再次限定为定时器创建的键,允许定时器操作 KeyedState。
如果要访问 KeyedState 和定时器,那必须在 KeyedStream 上使用 ProcessFunction。
stream.keyBy(...).process(new MyProcessFunction())
针对不同得流Flink提供了8个Process Function
ProcessFunction :最原始,自定义程度高,什么都能做
KeyedProcessFunction:keyby后使用得process中传入得Process Function
CoProcessFunction:connect后使用得process中传入得Process Function
ProcessJoinFunction:两条流Join连接后使用得process中传入得Process Function
BroadcastProcessFunction:广播流使用得process中传入得Process Function
KeyedBroadcastProcessFunction:keyby广播流使用得process中传入得Process Function
ProcessWindowFunction:开窗后使用得process中传入得Process Function
ProcessAllWindowFunction:AllWindow后使用得process中传入得Process Function
Process Function
processElement(v: IN, ctx: Context, out:Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(sideoutputs)。
onTimer(timestamp: Long, ctx: OnTimerContext, out:Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector
为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
简单使用proceess实现测输出流的功能
package com.aikfk.flink.datastream.processfunction; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * @author :caizhengjie * @description:TODO * @date :2021/3/23 3:04 下午 */ public class ProcessSideOutPut { public static void main(String[] args) throws Exception { //1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(data -> { String[] split = data.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); }); //3.使用ProcessFunction将数据分流 SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, Context context, Collector<WaterSensor> collector) throws Exception { //取出水位线 Integer vc = value.getVc(); //根据水位线高低,分流 if (vc >= 30) { //将数据输出至主流 collector.collect(value); } else { //将数据输出至侧输出流 context.output(new OutputTag<Tuple2<String, Integer>>("SideOut") { }, new Tuple2<>(value.getId(), vc)); } } }); //4.打印数据 result.print("主流"); DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(new OutputTag<Tuple2<String, Integer>>("SideOut") { }); sideOutput.print("Side"); //5.执行任务 env.execute(); } }
测试数据:
ws_001,1577844002,1 ws_001,1577844002,1 ws_001,1577844002,1 ws_001,1577844002,40 ws_001,1577844002,45
运行结果:
Side> (ws_001,1) Side> (ws_001,1) Side> (ws_001,1) 主流> WaterSensor{id='ws_001', ts=1577844002, vc=40} 主流> WaterSensor{id='ws_001', ts=1577844002, vc=45}
简单使用proceess实现定时器的功能
基于处理时间或者时间时间处理过一个元素之后, 注册一个定时器, 然后 指定的时间执行。
package com.aikfk.flink.datastream.processfunction; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/23 3:04 下午 */ public class ProcessOnTimer { public static void main(String[] args) throws Exception { //1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(data -> { String[] split = data.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); }); //3.使用ProcessFunction的定时器功能 SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.keyBy(WaterSensor::getId).process(new ProcessFunction<WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, Context context, Collector<WaterSensor> collector) throws Exception { //获取当前数据的处理时间 long ts = context.timerService().currentProcessingTime(); System.out.println(ts); //注册定时器,当前数据的处理时间 + 5秒 context.timerService().registerProcessingTimeTimer(ts + 5000L); //输出数据 collector.collect(value); } //注册的定时器响起,触发动作 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception { System.out.println("定时器触发:" + timestamp); } }); //4.打印数据 result.print(); //5.执行任务 env.execute(); } }
运行结果:
1616735657033 WaterSensor{id='ws_001', ts=1577844001, vc=45} 定时器触发:1616735662033 1616735662981 WaterSensor{id='ws_001', ts=157784400, vc=67} 定时器触发:1616735667981
Process Function项目案例
对于每一个接入的数据元素:
更新数据状态
注册未来某一时间需要调用的callback回调函数
当某一时间到来后:
检查条件是否满足,并执行对应的行为,例如输出数据元素等
功能需求:
记录每个传入的Key的counts数量
如果指定的Key在最近100ms (Event Time)没有接收到任何 Element,则输出key/ count键值对。
大致思路:
存储count值,key以及最后更新的TimeStamp到 ValueState 中, ValueState 由 key隐含定义;
对于每条记录:
更新计数器并修改最后的时间戳
注册一个100ms timer计时器,起始时间从当前的EventTime开始
Times被回调时:
检查存储计数的最后修改时间与回调的事件时间TimeStamp
如果匹配则发送键/计数键值对(即在100ms内没有更新)
开发代码:
package com.aikfk.flink.base; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MySource implements SourceFunction<String> { @Override public void cancel() { } @Override public void run(SourceContext<String> ctx) throws Exception { String[] datas = { "a,1575159390000", "a,1575159402000", "b,1575159427000", "c,1575159382000", "b,1575159407000", "a,1575159302000" }; for (int k = 0; k < datas.length; k++) { Thread.sleep(100); ctx.collect(datas[k]); } } }
package com.aikfk.flink.datastream.processfunction; import com.aikfk.flink.base.MySource; import com.aikfk.flink.base.Tools; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.time.Duration; /** * @author :caizhengjie * @description:TODO * @date :2021/3/26 1:32 下午 */ public class ProcessFunction { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.生成dataStream1,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法 DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String s) throws Exception { String[] words = s.split(","); return new Tuple2<>(words[0] , Long.parseLong(words[1])); } }) // 3.生成watermark .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() { @Override public long extractTimestamp(Tuple2<String,Long> input, long l) { return input.f1; } })) // 4.keyby .keyBy(key -> key.f0) // 实现processfunction方法 .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() { private ValueState<CountWithTimestamp> state; @Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState( new ValueStateDescriptor<CountWithTimestamp>("mystate",CountWithTimestamp.class)); } @Override public void processElement(Tuple2<String, Long> value, Context context, Collector<Tuple2<String, Long>> collector) throws Exception { CountWithTimestamp currentElement = state.value(); if (currentElement == null){ currentElement = new CountWithTimestamp(); currentElement.key = value.f0; } // 对key进行累加 currentElement.count ++; currentElement.lastModified = context.timestamp(); state.update(currentElement); // 注册定时器 context.timerService().registerEventTimeTimer(currentElement.lastModified + 1000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { CountWithTimestamp result = state.value(); System.out.println(ctx.getCurrentKey()+" timestamp : "+ Tools.getMsToDate(timestamp) + " ctx.timestamp :"+ Tools.getMsToDate(ctx.timestamp())+ " lastModified:"+Tools.getMsToDate(result.lastModified)); if ((result.lastModified + 1000) == timestamp){ out.collect(new Tuple2<>(result.key, result.count)); } } }); dataStream.print(); env.execute("Window WordCount"); } private static class CountWithTimestamp{ private String key; private long count; private long lastModified; } }
运行结果:
a timestamp : 2019-12-01 08:15:03.000 ctx.timestamp :2019-12-01 08:15:03.000 lastModified:2019-12-01 08:15:02.000 (a,3) c timestamp : 2019-12-01 08:16:23.000 ctx.timestamp :2019-12-01 08:16:23.000 lastModified:2019-12-01 08:16:22.000 (c,1) a timestamp : 2019-12-01 08:16:31.000 ctx.timestamp :2019-12-01 08:16:31.000 lastModified:2019-12-01 08:15:02.000 a timestamp : 2019-12-01 08:16:43.000 ctx.timestamp :2019-12-01 08:16:43.000 lastModified:2019-12-01 08:15:02.000 b timestamp : 2019-12-01 08:16:48.000 ctx.timestamp :2019-12-01 08:16:48.000 lastModified:2019-12-01 08:16:47.000 (b,2) b timestamp : 2019-12-01 08:17:08.000 ctx.timestamp :2019-12-01 08:17:08.000 lastModified:2019-12-01 08:16:47.000