day04_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间 - Time
- Flink水印 - Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint - 检查点, 分布式一致性,解决数据丢失,故障恢复数据, 存储的是全局的状态, 持久化HDFS分布式文件系统中
- State - 状态,分为Managed state(托管状态) 和 Rawed state (原始状态); 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time - 时间 , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window - 窗口,时间窗口 和 计数窗口, TimeWindow 、 countwindow、 sessionwindow
Window操作
- 为什么需要 Window - 窗口
数据是动态的, 无界的, 需要窗口划定范围,将无界数据转换成有界、静态的数据进行计算。
Window分类
- time - 时间进行分类
- 时间的窗口级别, 一天,一小时,一分钟
- 用的比较多 滚动窗口 - tumbling window 和 滑动窗口 - sliding window
- 滚动窗口 ,窗口时间和滑动时间一样就是滚动时间
- 滑动窗口, 滑动的时间小于窗口的时间;
- 会话窗口 - session windows
- count - 计数进行分类
- 滚动计数窗口
- 滑动计数窗口
如何使用
windows的案例
时间窗口需求
- 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
- 每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * Author itcast * Date 2021/6/18 15:00 * 开发步骤 * 1. 将 字符串 9,3 转换成 CartInfo * 2. 使用 滚动窗口, 滑动窗口 * 3. 分组和聚合 * 4. 打印输出 * 5. 执行环境 */ public class WindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取 socket 数据源 DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.将9,3转为CartInfo(9,3) DataStream<CartInfo> mapDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] kv = value.split(","); return new CartInfo(kv[0], Integer.parseInt(kv[1])); } }); //4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和 // Tumbling(滚动)Processing(处理)TimeWindows(时间窗口) //需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量 SingleOutputStreamOperator<CartInfo> result1 = mapDS.keyBy(t -> t.sensorId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("count"); //需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量 SingleOutputStreamOperator<CartInfo> result2 = mapDS.keyBy(t -> t.sensorId) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //5.打印输出 //result1.print(); result2.print(); //6.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
计数窗口需求
- 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
- 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/6/18 15:46 * Desc TODO */ public class CountWindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取 socket 数据源 DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.将9,3转为CartInfo(9,3) DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() { @Override public WindowDemo01.CartInfo map(String value) throws Exception { String[] kv = value.split(","); return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1])); } }); // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 // //countWindow(long size, long slide) SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5) .sum("count"); // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5, 3) .sum("count"); //打印输出 //result1.print(); result2.print(); //执行环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } } package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/6/18 15:46 * Desc TODO */ public class CountWindowDemo01 { public static void main(String[] args) throws Exception { //1.env 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.读取 socket 数据源 DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.将9,3转为CartInfo(9,3) DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() { @Override public WindowDemo01.CartInfo map(String value) throws Exception { String[] kv = value.split(","); return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1])); } }); // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 // //countWindow(long size, long slide) SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5) .sum("count"); // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId()) .countWindow(5, 3) .sum("count"); //打印输出 //result1.print(); result2.print(); //执行环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
Flink - Time 和 watermark
Time - 时间
水印机制 - watermark
- 主要解决数据延迟问题
- 水印(时间戳) = 事件时间 - 允许最大的延时时间
- 窗口触发条件
水印时间 >= 窗口的结束时间 触发计算
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s, 计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; /** * Author itcast * Date 2021/6/18 16:54 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置属性 ProcessingTime , 新版本 默认设置 EventTime //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() { boolean flag = true; Random rm = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (flag) { ctx.collect(new Order( UUID.randomUUID().toString(), rm.nextInt(3), rm.nextInt(101), //模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000 System.currentTimeMillis() - rm.nextInt(5) * 1000 )); Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime DataStream<Order> result = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.eventTime) ) //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 .keyBy(t -> t.userId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); //4.Sink result.print(); //5.execute env.execute(); } //创建订单类 @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
- 自定义重写接口实现水印机制
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; /** * Author itcast * Date 2021/6/18 16:54 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置属性 ProcessingTime , 新版本 默认设置 EventTime //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() { boolean flag = true; Random rm = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (flag) { ctx.collect(new Order( UUID.randomUUID().toString(), rm.nextInt(3), rm.nextInt(101), //模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000 System.currentTimeMillis() - rm.nextInt(5) * 1000 )); Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime DataStream<Order> result = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.eventTime) ) //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 .keyBy(t -> t.userId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); //4.Sink result.print(); //5.execute env.execute(); } //创建订单类 @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
- 一秒钟生成一条订单数据, 根据用户进行统计总金额,每5秒钟计算5秒中窗口, 允许最大的延时时间是 3秒; 如果超过了 3秒时间,再来的数据存储到 outputTag ,打印输出正常的数据和严重迟到的数据
package cn.itcast.flink.basestone; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.time.Duration; import java.util.Random; import java.util.UUID; /** * Author itcast * Date 2021/6/18 17:47 * Desc TODO */ public class LatenessWatermarkDemo { public static void main(String[] args) throws Exception { //此时间的将严重延迟的数据保存到 outputTag 中。 /// 创建 outputTag 用于存储严重延迟的数据 数据类型为 Order //4.Sink 打印正常和严重延迟的数据 //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置属性 ProcessingTime , 新版本 默认设置 EventTime //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() { boolean flag = true; Random rm = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (flag) { ctx.collect(new Order( UUID.randomUUID().toString(), rm.nextInt(3), rm.nextInt(101), //模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000 System.currentTimeMillis() - rm.nextInt(10) * 1000 )); //Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }); //定义一个允许最大的延时存储的数据 tag OutputTag<Order> seriousLateOrder = new OutputTag<>("SeriousLateOrder", TypeInformation.of(Order.class)); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime SingleOutputStreamOperator<Order> result = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.eventTime) ) //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 .keyBy(t -> t.userId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(seriousLateOrder) .sum("money"); //4.Sink result.print("正常数据和不严重迟到的数据"); result.getSideOutput(seriousLateOrder).print(); //5.execute env.execute(); } //创建订单类 @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
Flink状态管理
- 状态就是基于 key 或者 算子 operator 的中间结果
- Flink state 分为两种 : Managed state - 托管状态 , Raw state - 原始状态
- Managed state 分为 两种:
- keyed state 基于 key 上的状态
支持的数据结构 valueState listState mapState broadcastState - operator state 基于操作的状态
字节数组, ListState
Flink keyed state 案例
- 需求
使用KeyedState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,
<hello,1>
<hello,3>
<hello,2>
输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型 - 开发
package cn.itcast.flink.state; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/6/21 8:34 * Desc TODO */ public class KeyedStateDemo { public static void main(String[] args) throws Exception { //1.env 设置并发度为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Source 参看课件 <城市,次数> => <城市,最大次数> DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of("北京", 1L), Tuple2.of("上海", 2L), Tuple2.of("北京", 6L), Tuple2.of("上海", 8L), Tuple2.of("北京", 3L), Tuple2.of("上海", 4L) ); //3.Transformation //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可) //实现方式1:直接使用maxBy--开发中使用该方式即可 SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0) //min只会求出最小的那个字段,其他的字段不管 //minBy会求出最小的那个字段和对应的其他的字段 //max只会求出最大的那个字段,其他的字段不管 //maxBy会求出最大的那个字段和对应的其他的字段 .maxBy(1); //实现方式2:通过managed state输入的state //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*城市*/, Long/*次数*/> 输出 Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/> // SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS .keyBy(t->t.f0) .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>>() { ValueState<Long> maxState = null; //-1.定义值类型的状态用来存储最大值 //3.2.重写 RichMapFunction 的open 方法 @Override public void open(Configuration parameters) throws Exception { //-2.定义状态描述符 //-3.从当前上下文获取内存中的状态值 ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class); maxState = getRuntimeContext().getState(maxStateDesc); } //3.3.重写 map 方法 //-4.获取state中历史最大值value和当前元素的最大值并比较 @Override public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception { //内存中state的存储的最大值 Long maxValue = maxState.value(); //当前的值 Long curValue = value.f1; if (maxValue == null || curValue > maxValue) { maxState.update(curValue); return Tuple3.of(value.f0, value.f1, curValue); } else { return Tuple3.of(value.f0, value.f1, maxValue); } } }); //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果 //4.Sink 打印输出 //result1.print(); result2.print(); //5.execute 执行环境 env.execute(); } }
Flink operator state 案例
- 需求
使用ListState存储offset模拟消费Kafka的offset维护 - 实现
package cn.itcast.flink.state; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Iterator; /** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */ public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1.创建流环境,便于观察设置并行度为 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理 env.enableCheckpointing(1000); env.setStateBackend(new FsStateBackend("file:///D:/chk")); //3.设置checkpoint的配置 外部chk,仅一次语义等 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4.开启重启策略 3秒钟尝试重启3次 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6.打印输出 source.print(); //7.执行 env.execute(); } //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常 @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; //处理 CPU 核心Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println("当前程序出错了...."); throw new Exception("程序出BUG..."); } } } //重写cancel方法 @Override public void cancel() { flag = false; } //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } } }
IndexOfThisSubtask(); System.out.println(“index:”+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(“当前程序出错了…”); throw new Exception(“程序出BUG…”); } } } //重写cancel方法 @Override public void cancel() { flag = false; }
//重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } }
}