day03_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间Time
- Flink水印Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
- State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window窗口,TimeWindow 、 countwindow、 sessionwindow
Window操作
Window分类
- time
- 用的比较多 滚动窗口和滑动窗口
- count
如何使用
案例
- 需求
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */
- 分析
- 代码
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */ public class WindowDemo { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .sum("count"); //统计 滑动窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //4.打印输出 result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
- 需求2 - countwindow
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口 - 代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */ public class WindowDemo02 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动计数窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .countWindow(5) .sum("count"); //统计 滑动计数窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .countWindow(10,5) .sum("count"); //4.打印输出 //result.printToErr(); result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
- 统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。
- 案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
- 代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 */ public class WindowDemo03 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 会话窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.打印输出 //result.printToErr(); result.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
Time 时间
- EventTime的重要性
- 防止出现网络抖动,造成数据的乱序,数据统计的丢失
- 窗口: 开始时间-结束时间
watermark 水印时间
- watermark 水印机制
- watermark 就是时间戳
- watermark = eventTime - maxDelayTime
- 触发计算 watermak >= 结束时间
watermark 案例
- 需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
基础版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); //分配水印机制 SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy //指定最大的延迟时间 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp); .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) //统计每个用户对应 购买 金额 .keyBy(t -> t.getUserId()) //指定窗口,每5秒钟统计5秒钟之内的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); sum.print(); // env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
- 扩展版:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); private int userId = 0; private long eventTime = 0L; private final long outOfOrdernessMillis = 3000; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //Watermark = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 时间戳 Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1); System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp())); output.emitWatermark(watermark); } }; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 /* DataStream<Order> result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money");*/ //开发中使用上面的代码进行业务计算即可 //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermark时间 DataStream<String> result = WatermarkDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) //把apply中的函数应用在窗口中的数据上 //WindowFunction<IN, OUT, KEY, W extends Window> .apply(new WindowFunction<Order, String, Integer, TimeWindow>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); @Override public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception { //准备一个集合用来存放属于该窗口的数据的事件时间 List<String> eventTimeList = new ArrayList<>(); for (Order order : input) { Long eventTime = order.eventTime; eventTimeList.add(df.format(eventTime)); } String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); result.printToErr(); env.execute(); //分配水印机制 } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
Allowed lateness
- 案例
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.time.Duration; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 14:51 * 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) * 要求每隔5s,计算5秒内,每个用户的订单总金额 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。 */ public class WatermarkDemo03 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class)); //分配水印机制 eventTime 默认使用 maxDelay 3秒 SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) .keyBy(t -> t.getUserId()) //窗口设置 每隔5s,计算5秒内 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //实例化侧输出流 主要用于晚于最大延迟 3 秒的数据 .allowedLateness(Time.seconds(3)) .sideOutputLateData(oot) //统计 .sum("money"); result.print("正常数据"); result.getSideOutput(oot).print("严重迟到的数据"); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
状态管理 state
- 有状态管理场景
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-znYxlAeB-1624261970363)(assets/image-20210507151242102.png)]
- 是否被Flink托管分为两类
- managed state
通过Flink自身进行状态的管理
数据结构: valueState ListState mapState - raw state
需要用户、程序员自己维护状态
数据结构: ListState
- 是否基于 key 进行state 管理
- keyed state
数据结构: valueState ListState mapState
reducingState - operator state
数据结构: ListState
Keyed state
- 案例 - 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型
- map映射
- 定义valueState 用于统计当前的 历史最大值
- 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/>
package cn.itcast.sz22.day03; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/5/7 15:58 * 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义. */ public class StateDemo01 { public static void main(String[] args) throws Exception { //1.env 设置并发度为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Source 参看课件 DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of("北京", 1L), Tuple2.of("上海", 2L), Tuple2.of("北京", 6L), Tuple2.of("上海", 8L), Tuple2.of("北京", 3L), Tuple2.of("上海", 4L) ); //3.Transformation //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可) //实现方式1:直接使用maxBy--开发中使用该方式即可 DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0) .maxBy(1); //min只会求出最小的那个字段,其他的字段不管 //minBy会求出最小的那个字段和对应的其他的字段 //max只会求出最大的那个字段,其他的字段不管 //maxBy会求出最大的那个字段和对应的其他的字段 //实现方式2:通过managed state输入的state //Tuple2<String/*单词*/, Long/*长度*/> 输出 Tuple3<String/*单词*/, Long/*长度*/, Long/*历史最大值*/> SingleOutputStreamOperator<Tuple3<String, Long, Long>> maxCount = tupleDS.keyBy(t -> t.f0) .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() { //保存当前内存中最大的值的state private transient ValueState<Long> currentMaxValue; @Override public void open(Configuration parameters) throws Exception { //存储到内存中的数据结构的描述 ValueStateDescriptor desc = new ValueStateDescriptor("maxCount", TypeInformation.of(Long.class)); currentMaxValue = getRuntimeContext().getState(desc); } @Override public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception { String city = value.f0; Long currentValue = value.f1; if (currentMaxValue.value() == null || currentMaxValue.value() < currentValue) { currentMaxValue.update(currentValue); return Tuple3.of(city, currentValue, currentMaxValue.value()); } else { return Tuple3.of(city, currentValue, currentMaxValue.value()); } } }); //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*单词*/, Long/*长度*/> 输出 Tuple3<String/*单词*/, Long/*长度*/, Long/*历史最大值*/> //-1.定义值类型的状态用来存储最大值 //3.2.重写 RichMapFunction 的open 方法 //-2.定义状态描述符 //-3.从当前上下文获取内存中的状态值 //3.3.重写 map 方法 //-4.获取state中历史最大值value和当前元素的最大值并比较 //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果 //4.Sink 打印输出 // result1.printToErr(); maxCount.print(); //5.execute 执行环境 env.execute(); } }
operate state
- 大多数场景就是读取 source ,用到数据结构 ListState
- 案例 - 使用ListState存储offset模拟Kafka的offset维护
package cn.itcast.sz22.day03; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; /** * Author itcast * Date 2021/5/7 16:59 * 使用ListState存储offset模拟Kafka的offset维护 */ public class OperatorStateDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学 env.enableCheckpointing(1000);//每隔 1s 执行一次Checkpoint //将全局的状态保存到哪里? hdfs://node1:8020/checkpoint/ env.setStateBackend(new FsStateBackend("file:///D:/ckp")); //当前任务被取消,checkpoint是否被保存下来 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //当前checkpoint 机制 EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000)); //2.Source DataStreamSource<String> sourceData = env.addSource(new MyMoniKafkaSource()); //3.Transformation //4.Sink sourceData.print(); //5.execute env.execute(); } //1.创建类 MyMoniKafkaSource 继承 RichparallelSourceFunction 并实现 CheckpointedFunction public static class MyMoniKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ //1.1. 定义ListState<Long>用于存储 offsetState、offset、flag ListState<Long> offsetState; Long offset = 0L; boolean flag = true; //1.2. 重写 initializeState 方法 // //创建List状态描述器 // //根据状态描述器初始化状态通过context @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offsetState", TypeInformation.of(Long.class)); offsetState = context.getOperatorStateStore() .getListState(desc); } @Override public void run(SourceContext<String> ctx) throws Exception { //获取并迭代ListState中的值,如果存在赋值给offset Iterable<Long> offsets = offsetState.get(); if(offsets.iterator().hasNext()){ offset = offsets.iterator().next(); } // //while(flag) while(flag){ //将处理的offset累加1、获取当前子任务的Index offset += 1; //ctx收集id和offset("分区:"+id+"消费到offset的位置为:"+offset int id = getRuntimeContext().getIndexOfThisSubtask(); ctx.collect("分区:"+id+"消费到offset的位置为:"+offset); Thread.sleep(2000); //)并输出 // //休眠2秒,此时保存state到checkpoint // //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复 if(offset%5==0){ System.out.println("当前程序出现bug"); throw new Exception("当前程序出现bug"); } } } @Override public void cancel() { flag = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { //清空内存offsetState中存储的offset offsetState.clear(); //添加offset到state中 offsetState.add(offset); } } }
set的位置为:"+offset int id = getRuntimeContext().getIndexOfThisSubtask(); ctx.collect(“分区:”+id+“消费到offset的位置为:”+offset); Thread.sleep(2000); //)并输出 // //休眠2秒,此时保存state到checkpoint // //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复 if(offset%5==0){ System.out.println(“当前程序出现bug”); throw new Exception(“当前程序出现bug”); } } }
@Override public void cancel() { flag = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { //清空内存offsetState中存储的offset offsetState.clear(); //添加offset到state中 offsetState.add(offset); } }
}