2021年最新最全Flink系列教程__Flink容错机制(五)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 2021年最新最全Flink系列教程__Flink容错机制(五)

day05_Flink容错机制

今日目标

  • Flink容错机制之Checkpoint
  • Flink容错机制之重启策略
  • 存储介质StateBackend
  • Checkpoint 配置方式
  • 状态恢复和重启策略
  • Savepoint手动重启并恢复
  • 并行度设置

Flink状态管理

  • 状态就是基于 key 或者 算子 operator 的中间结果
  • Flink state 分为两种 : Managed state - 托管状态 , Raw state - 原始状态
  • Managed state 分为 两种:
  1. keyed state 基于 key 上的状态
    支持的数据结构 valueState listState mapState broadcastState
  2. operator state 基于操作的状态
    字节数组, ListState

Flink keyed state 案例

  • 需求
    使用KeyedState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,
    <hello,1>
    <hello,3>
    <hello,2>
    输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型
  • 开发
package cn.itcast.flink.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/6/21 8:34
 * Desc TODO
 */
public class KeyedStateDemo {
    public static void main(String[] args) throws Exception {
        //1.env 设置并发度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.Source 参看课件 <城市,次数> => <城市,最大次数>
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );
        //3.Transformation
        //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
        //实现方式1:直接使用maxBy--开发中使用该方式即可
        SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
                //min只会求出最小的那个字段,其他的字段不管
                //minBy会求出最小的那个字段和对应的其他的字段
                //max只会求出最大的那个字段,其他的字段不管
                //maxBy会求出最大的那个字段和对应的其他的字段
                .maxBy(1);
        //实现方式2:通过managed state输入的state
        //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*城市*/, Long/*次数*/> 输出 Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>
        //
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS
                .keyBy(t->t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>>() {
            ValueState<Long> maxState = null;
            //-1.定义值类型的状态用来存储最大值
            //3.2.重写 RichMapFunction 的open 方法
            @Override
            public void open(Configuration parameters) throws Exception {
                //-2.定义状态描述符
                //-3.从当前上下文获取内存中的状态值
                ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class);
                maxState = getRuntimeContext().getState(maxStateDesc);
            }
            //3.3.重写 map 方法
            //-4.获取state中历史最大值value和当前元素的最大值并比较
            @Override
            public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                //内存中state的存储的最大值
                Long maxValue = maxState.value();
                //当前的值
                Long curValue = value.f1;
                if (maxValue == null || curValue > maxValue) {
                    maxState.update(curValue);
                    return Tuple3.of(value.f0, value.f1, curValue);
                } else {
                    return Tuple3.of(value.f0, value.f1, maxValue);
                }
            }
        });
        //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果
        //4.Sink 打印输出
        //result1.print();
        result2.print();
        //5.execute 执行环境
        env.execute();
    }
}

Flink operator state 案例

  • 需求
    使用ListState存储offset模拟消费Kafka的offset维护
  • 实现
package cn.itcast.flink.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
/**
 * Author itcast
 * Date 2021/6/21 9:18
 * Desc TODO
 */
public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        //1.创建流环境,便于观察设置并行度为 1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理
        env.enableCheckpointing(1000);
        env.setStateBackend(new FsStateBackend("file:///D:/chk"));
        //3.设置checkpoint的配置 外部chk,仅一次语义等
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //4.开启重启策略 3秒钟尝试重启3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
        //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource
        DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource());
        //6.打印输出
        source.print();
        //7.执行
        env.execute();
    }
    //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction
    public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String>
    implements CheckpointedFunction{
        //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState
        ListState<Long> offsetState = null;
        boolean flag = true;
        Long offset = 0L;
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class);
            offsetState = context.getOperatorStateStore().getListState(offsetStateDesc);
        }
        //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            Iterator<Long> iterator = offsetState.get().iterator();
            if(iterator.hasNext()){
                offset = iterator.next();
            }
            while(flag){
                offset = offset + 1;
                //处理 CPU 核心Index
                int idx = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println("index:"+idx+" offset:"+offset);
                Thread.sleep(1000);
                if(offset % 5 ==0){
                    System.out.println("当前程序出错了....");
                    throw new Exception("程序出BUG...");
                }
            }
        }
        //重写cancel方法
        @Override
        public void cancel() {
            flag = false;
        }
        //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            offsetState.clear();
            offsetState.add(offset);
        }
    }
}

Flink的容错机制

  • checkpoint : 某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上。

checkpoint 的执行流程

  • 触发checkpoint , JobManager 主节点
  • JobManager 触发 barrier 信号, 给 source -> transformation -> sink , 都会触发,将当前算子 operator state 保存到 HDFS 或者本地文件上, 每个operator 都备份完, 当前一个 checkpoint 就执行完毕了。

存储介质

  • memoryStatebackend 生产环境不推荐
  • FsStatebackend 就是存储到 HDFS 或者本地文件系统上 ,都可以用于生产环境。
  • RocksdbStatebackend 先在本地进行存储, 异步增量的存储到 HDFS 文件系统上, 一般支持大的中间state 场景

Checkpoint 配置方式

  1. 全局的配置文件 flink-conf.yaml
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
  1. 通过代码进行设置
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend("file:///D:/chk"));
  1. 需求
package cn.itcast.flink.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
/**
 * Author itcast
 * Date 2021/6/21 9:18
 * Desc TODO
 */
public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        //1.创建流环境,便于观察设置并行度为 1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理
        env.enableCheckpointing(1000);
        // 设置 state backend FsStateBackend : 文件系统
        // RocksdbStateBackend : rocksdb 插件 异步增量刷新到 HDFS 文件系统中
        env.setStateBackend(new FsStateBackend("file:///D:/chk"));
        // 设置 checkpoint,如果当前的任务被取消,确定当前的checkpoint 是否删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig
                .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置当前的 checkpoint 的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 两个checkpoint 之间最短的间隔时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置当前并行执行的 checkpoint 的个数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.setStateBackend(new FsStateBackend("hdfs://node1:8020/checkpoints"));
        //env.setStateBackend(new RocksdbStateBackend())
        //3.设置checkpoint的配置 外部chk,仅一次语义等
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //4.开启重启策略 3秒钟尝试重启3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
        //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource
        DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource());
        //6.打印输出
        source.print();
        //7.执行
        env.execute();
    }
    //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction
    public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String>
    implements CheckpointedFunction{
        //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState
        ListState<Long> offsetState = null;
        boolean flag = true;
        Long offset = 0L;
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class);
            offsetState = context.getOperatorStateStore().getListState(offsetStateDesc);
        }
        //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            Iterator<Long> iterator = offsetState.get().iterator();
            if(iterator.hasNext()){
                offset = iterator.next();
            }
            while(flag){
                offset = offset + 1;
                //处理 CPU 核心Index
                int idx = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println("index:"+idx+" offset:"+offset);
                Thread.sleep(1000);
                if(offset % 5 ==0){
                    System.out.println("当前程序出错了....");
                    throw new Exception("程序出BUG...");
                }
            }
        }
        //重写cancel方法
        @Override
        public void cancel() {
            flag = false;
        }
        //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            offsetState.clear();
            offsetState.add(offset);
        }
    }
}

状态恢复和重启策略

  • 状态恢复: 从checkpoint 备份的HDFS 将数据 state 恢复到当前程序中
  • 重启策略: 当前程序出现异常,不同时间、重启次数等方式进行程序重启
  • 重启策略的分类:
  1. 没有重启 noStrategy
  2. 一直重启 (默认)
  3. 固定延迟重启策略 , 重启3次,每次之间延时时间5s
  4. 失败率重启策略, 5分钟之内, 重启5次, 每次1分钟
  • 需求
    从 socket 读取数据,使用 checkpoint 和重启机制, wordcount 统计, 模拟 输入的bug。
package cn.itcast.flink.state;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Date 2021/6/21 11:18
 * Desc TODO
 */
public class CheckpointRestartDemo {
    public static void main(String[] args) throws Exception {
        //1.env 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //===========Checkpoint参数设置====
        //设置Checkpoint的时间间隔为1000ms做一次Checkpoint
        env.enableCheckpointing(1000);
        //设置State状态存储介质 file:///d:/chk
        env.setStateBackend(new FsStateBackend("file:///d:/chk"));
        //设置两次checkpoint之间的最短间隔时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        //当作业被取消时候,保留checkpoint不删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig
                .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置checkpointMode为 Exactly_Once 语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //=============重启策略===========
        //-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启
        //-2.配置无重启策略
//        env.setRestartStrategy(RestartStrategies.noRestart());
        //-3.固定延迟重启策略
        //重启3次,每次间隔10s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));
        //-4.失败率重启--偶尔使用
        //5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));
        //2.Source 创建socket数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.Transformation
        //3.1空格切割出每个单词并flatMap转换成 Tuple2
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    //3.2遍历循环如果遇到 bug 单词就抛出异常模拟程序报错
                    if (word.equals("bug")) {
                        System.out.println("当前程序报错...");
                        throw new Exception("程序 BUG ...");
                    }
                    out.collect(Tuple2.of(word, 1));
                }
            }
        })
                //3.3分组
                //注意:批处理的分组是groupBy,流处理的分组是keyBy
                .keyBy(t -> t.f0)
                //3.4聚合
                .sum(1);
        //4.sink 打印输出
        result.print();
        //5.execute
        env.execute();
    }
}

Savepoint 手动重启并恢复

  • 手动备份和恢复
flink savepoint jobid 存储路径
  • 从 savepoint 恢复数据
flink run -s 存储路径 --class 全路径类名 --jar jar包

并行度设置

  • 并行度设置四种
  1. 算子级别
  2. 全局并行度
  3. 客户端(黑窗口)并行度
  4. 配置文件设置
  • 优先级
  1. 算子级别 => 2. 全局的并行度设置 => 3.客户端设置并行度 => 4.配置文件并行度


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5月前
|
流计算
JD Flink教程
JD Flink教程
21 0
|
5月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
239 0
|
5月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
211 0
|
7天前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
7天前
|
数据处理 Apache 流计算
【Flink】Flink 中的Watermark机制
【4月更文挑战第21天】【Flink】Flink 中的Watermark机制
|
7天前
|
存储 数据处理 Apache
【Flink】Flink状态机制
【4月更文挑战第21天】【Flink】Flink状态机制
|
4月前
|
消息中间件 存储 Kafka
在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
【1月更文挑战第19天】【1月更文挑战第91篇】在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
80 3
|
5月前
|
Apache 流计算
Apache Flink教程----2.本地开发
Apache Flink教程----2.本地开发
43 0
|
5月前
|
SQL 分布式计算 Java
2021年最新最全Flink系列教程__Flink综合案例(九)
2021年最新最全Flink系列教程__Flink综合案例(九)
37 0
|
5月前
|
消息中间件 NoSQL 数据挖掘
2021年最新最全Flink系列教程__Flink高级特性和新特性(八)
2021年最新最全Flink系列教程__Flink高级特性和新特性(八)
28 0