day05_Flink容错机制
今日目标
- Flink容错机制之Checkpoint
- Flink容错机制之重启策略
- 存储介质StateBackend
- Checkpoint 配置方式
- 状态恢复和重启策略
- Savepoint手动重启并恢复
- 并行度设置
Flink状态管理
- 状态就是基于 key 或者 算子 operator 的中间结果
- Flink state 分为两种 : Managed state - 托管状态 , Raw state - 原始状态
- Managed state 分为 两种:
- keyed state 基于 key 上的状态
支持的数据结构 valueState listState mapState broadcastState - operator state 基于操作的状态
字节数组, ListState
Flink keyed state 案例
- 需求
使用KeyedState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,
<hello,1>
<hello,3>
<hello,2>
输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型 - 开发
package cn.itcast.flink.state; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/6/21 8:34 * Desc TODO */ public class KeyedStateDemo { public static void main(String[] args) throws Exception { //1.env 设置并发度为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Source 参看课件 <城市,次数> => <城市,最大次数> DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of("北京", 1L), Tuple2.of("上海", 2L), Tuple2.of("北京", 6L), Tuple2.of("上海", 8L), Tuple2.of("北京", 3L), Tuple2.of("上海", 4L) ); //3.Transformation //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可) //实现方式1:直接使用maxBy--开发中使用该方式即可 SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0) //min只会求出最小的那个字段,其他的字段不管 //minBy会求出最小的那个字段和对应的其他的字段 //max只会求出最大的那个字段,其他的字段不管 //maxBy会求出最大的那个字段和对应的其他的字段 .maxBy(1); //实现方式2:通过managed state输入的state //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*城市*/, Long/*次数*/> 输出 Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/> // SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS .keyBy(t->t.f0) .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>>() { ValueState<Long> maxState = null; //-1.定义值类型的状态用来存储最大值 //3.2.重写 RichMapFunction 的open 方法 @Override public void open(Configuration parameters) throws Exception { //-2.定义状态描述符 //-3.从当前上下文获取内存中的状态值 ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class); maxState = getRuntimeContext().getState(maxStateDesc); } //3.3.重写 map 方法 //-4.获取state中历史最大值value和当前元素的最大值并比较 @Override public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception { //内存中state的存储的最大值 Long maxValue = maxState.value(); //当前的值 Long curValue = value.f1; if (maxValue == null || curValue > maxValue) { maxState.update(curValue); return Tuple3.of(value.f0, value.f1, curValue); } else { return Tuple3.of(value.f0, value.f1, maxValue); } } }); //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果 //4.Sink 打印输出 //result1.print(); result2.print(); //5.execute 执行环境 env.execute(); } }
Flink operator state 案例
- 需求
使用ListState存储offset模拟消费Kafka的offset维护 - 实现
package cn.itcast.flink.state; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Iterator; /** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */ public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1.创建流环境,便于观察设置并行度为 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理 env.enableCheckpointing(1000); env.setStateBackend(new FsStateBackend("file:///D:/chk")); //3.设置checkpoint的配置 外部chk,仅一次语义等 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4.开启重启策略 3秒钟尝试重启3次 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6.打印输出 source.print(); //7.执行 env.execute(); } //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常 @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; //处理 CPU 核心Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println("当前程序出错了...."); throw new Exception("程序出BUG..."); } } } //重写cancel方法 @Override public void cancel() { flag = false; } //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } } }
Flink的容错机制
- checkpoint : 某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上。
checkpoint 的执行流程
- 触发checkpoint , JobManager 主节点
- JobManager 触发 barrier 信号, 给 source -> transformation -> sink , 都会触发,将当前算子 operator state 保存到 HDFS 或者本地文件上, 每个operator 都备份完, 当前一个 checkpoint 就执行完毕了。
存储介质
- memoryStatebackend 生产环境不推荐
- FsStatebackend 就是存储到 HDFS 或者本地文件系统上 ,都可以用于生产环境。
- RocksdbStatebackend 先在本地进行存储, 异步增量的存储到 HDFS 文件系统上, 一般支持大的中间state 场景
Checkpoint 配置方式
- 全局的配置文件 flink-conf.yaml
state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
- 通过代码进行设置
env.enableCheckpointing(1000); env.setStateBackend(new FsStateBackend("file:///D:/chk"));
- 需求
package cn.itcast.flink.state; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.Iterator; /** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */ public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1.创建流环境,便于观察设置并行度为 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理 env.enableCheckpointing(1000); // 设置 state backend FsStateBackend : 文件系统 // RocksdbStateBackend : rocksdb 插件 异步增量刷新到 HDFS 文件系统中 env.setStateBackend(new FsStateBackend("file:///D:/chk")); // 设置 checkpoint,如果当前的任务被取消,确定当前的checkpoint 是否删除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置当前的 checkpoint 的超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 两个checkpoint 之间最短的间隔时间 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 设置当前并行执行的 checkpoint 的个数 env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //env.setStateBackend(new FsStateBackend("hdfs://node1:8020/checkpoints")); //env.setStateBackend(new RocksdbStateBackend()) //3.设置checkpoint的配置 外部chk,仅一次语义等 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4.开启重启策略 3秒钟尝试重启3次 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6.打印输出 source.print(); //7.执行 env.execute(); } //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常 @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; //处理 CPU 核心Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println("当前程序出错了...."); throw new Exception("程序出BUG..."); } } } //重写cancel方法 @Override public void cancel() { flag = false; } //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } } }
状态恢复和重启策略
- 状态恢复: 从checkpoint 备份的HDFS 将数据 state 恢复到当前程序中
- 重启策略: 当前程序出现异常,不同时间、重启次数等方式进行程序重启
- 重启策略的分类:
- 没有重启 noStrategy
- 一直重启 (默认)
- 固定延迟重启策略 , 重启3次,每次之间延时时间5s
- 失败率重启策略, 5分钟之内, 重启5次, 每次1分钟
- 需求
从 socket 读取数据,使用 checkpoint 和重启机制, wordcount 统计, 模拟 输入的bug。
package cn.itcast.flink.state; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Author itcast * Date 2021/6/21 11:18 * Desc TODO */ public class CheckpointRestartDemo { public static void main(String[] args) throws Exception { //1.env 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //===========Checkpoint参数设置==== //设置Checkpoint的时间间隔为1000ms做一次Checkpoint env.enableCheckpointing(1000); //设置State状态存储介质 file:///d:/chk env.setStateBackend(new FsStateBackend("file:///d:/chk")); //设置两次checkpoint之间的最短间隔时间 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是 //当作业被取消时候,保留checkpoint不删除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpointMode为 Exactly_Once 语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。 env.getCheckpointConfig().setCheckpointTimeout(60000); //设置同一时间有多少个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //=============重启策略=========== //-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启 //-2.配置无重启策略 // env.setRestartStrategy(RestartStrategies.noRestart()); //-3.固定延迟重启策略 //重启3次,每次间隔10s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000)); //-4.失败率重启--偶尔使用 //5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10))); //2.Source 创建socket数据源 DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.Transformation //3.1空格切割出每个单词并flatMap转换成 Tuple2 SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { //3.2遍历循环如果遇到 bug 单词就抛出异常模拟程序报错 if (word.equals("bug")) { System.out.println("当前程序报错..."); throw new Exception("程序 BUG ..."); } out.collect(Tuple2.of(word, 1)); } } }) //3.3分组 //注意:批处理的分组是groupBy,流处理的分组是keyBy .keyBy(t -> t.f0) //3.4聚合 .sum(1); //4.sink 打印输出 result.print(); //5.execute env.execute(); } }
Savepoint 手动重启并恢复
- 手动备份和恢复
flink savepoint jobid 存储路径
- 从 savepoint 恢复数据
flink run -s 存储路径 --class 全路径类名 --jar jar包
并行度设置
- 并行度设置四种
- 算子级别
- 全局并行度
- 客户端(黑窗口)并行度
- 配置文件设置
- 优先级
- 算子级别 => 2. 全局的并行度设置 => 3.客户端设置并行度 => 4.配置文件并行度