01 引言
在前面的博客,我们学习了Flink
的一些高级API
,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
在前面的教程,我们已经学习了Flink
的四大基石里面的State
了,如下图,本文讲解下Checkpoint
容错机制:
02 Checkpoint
2.1 Checkpoint VS State
State:
- 维护/存储的是某一个
Operator
的运行的状态/历史值,是维护在内存中; - 一般指一个具体的
Operator
的状态(operator
的状态表示一些算子在运行的过程中会产生的一些历史结果,如前面的maxBy
底层会维护当前的最大值,也就是会维护一个keyedOperator
,这个State
里面存放就是maxBy
这个Operator
中的最大值); State
数据默认保存在Java
的堆内存中/TaskManage
节点的内存中;State
可以被记录,在失败的情况下数据还可以恢复。
Checkpoint:
- 某一时刻,
Flink
中所有的Operator
的当前State
的全局快照,一般存在磁盘上; - 表示了一个
Flink Job
在一个特定时刻的一份全局状态快照,即包含了所有Operator
的状态; - 可以理解为
Checkpoint
是把State
数据定时持久化存储了; - 比如
KafkaConsumer
算子中维护的Offset
状态,当任务重新恢复的时候可以从Checkpoint
中获取。
注意:
Flink
中的Checkpoint
底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性!- Chandy-Lamport algorithm算法的作者也是
ZK
中Paxos一致性算法的作者
Flink
中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark
的StructuredStreaming
也借鉴了该算法。
2.2 Checkpoint 执行流程
2.2.1 简单流程
流程描述:
- step1 :
Flink
的JobManager
创建CheckpointCoordinator
- step2 :
Coordinator
向所有的SourceOperator
发送Barrier
栅栏(理解为执行Checkpoint
的信号) - step3 :
SourceOperator
接收到Barrier
之后,暂停当前的操作(暂停的时间很短,因为后续的写快照是异步的),并制作State
快照, 然后将自己的快照保存到指定的介质中(如HDFS
), 一切ok
之后向Coordinator
汇报并将Barrier
发送给下游的其他Operator
- step4 : 其他的如
TransformationOperator
接收到Barrier,
重复第2步,最后将Barrier
发送给Sink
- step5 :
Sink
接收到Barrier
之后重复第2步 - step6 :
Coordinator
接收到所有的Operator
的执行ok
的汇报结果,认为本次快照执行成功
注意:
- 在往介质(如
HDFS
)中写入快照数据的时候是异步的(为了提高效率) - 分布式快照执行时的数据一致性由
Chandy-Lamport algorithm
分布式快照算法保证
2.2.2 复杂流程
下图左侧是Checkpoint Coordinator
,是整个Checkpoint
的发起者,中间是由两个 source
,一个 sink
组成的Flink
作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS
。
step1 :Checkpoint Coordinator
向所有 source
节点 trigger Checkpoint
。
step2 :source
节点向下游广播 barrier
,这个barrier
就是实现 Chandy-Lamport
分布式快照算法的核心,下游的task
只有收到所有input
的 barrier
才会执行相应的Checkpoint
。
step3 :当 task
完成state
备份后,会将备份数据的地址(state handle
)通知给Checkpoint coordinator
step4 :下游的 sink
节点收集齐上游两个 input
的 barrier
之后,会执行本地快照,(栅栏对齐),这里还展示了RocksDB incremental Checkpoint
(增量Checkpoint
)的流程,首先RocksDB
会全量刷数据到磁盘上(红色大三角表示),然后Flink
框架会从中选择没有上传的文件进行持久化备份(紫色小三角)
step5 :同样的,sink
节点在完成自己的 Checkpoint
之后,会将 state handle
返回通知 Coordinator
step6 :最后,当 Checkpoint coordinator
收集齐所有task
的state handle
,就认为这一次的Checkpoint
全局完成了,向持久化存储中再备份一个 Checkpoint meta
文件。
2.3 State状态后端/State存储介质
注意:前面学习了Checkpoint
其实就是Flink
中某一时刻,所有的Operator
的全局快照,那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端,Flink
中的State
状态后端有很多种,如下:
功能 | MemStateBackend | FastStateBackend | RocksDBStateBackend |
构造方法 | MemoryStateBackend(int maxStateSize,boolean asynchronousSnapshots) | FsStateBackend(URI checkpointDataUri,boolean asynchronousSnapshots) | RocksDBStateBackend(URLI checkpointDataUri,boolean enableIncrementalCheckpointing) |
存储方式 | State(TaskManager内存)、Checkpoint(JobManager内存) | State(TaskManager内存),Checkpoint(外部文件系统、本地或HDFS) | State(TaskManager上的KV数据库,实际使用内存+磁盘),Checkpoint(外部文件系统,本地或HDFS) |
容量限制 | 单个State maxStateSize(默认为5M),maxStateSize<akka.framesize(默认为10M),总大小不超过JobManager内存 | 单个TaskManager上State总量不超过它的内存,总大小不超过配置的文件系统容量 | 单个TaskManger上的State总量不超过它的内存+磁盘,单个Key最大2G,总大小不超过配置的文件系统容量 |
推荐使用的场景 | 本地测试(几乎无状态的作业,比如:ETL,JobManager不容易挂,或者挂掉影响不大的情况),不推荐生产环境使用 | 常规使用状态的作业(例如:分钟级窗口聚合、join),需要开启HA的作业,可以在生产场景使用 | 超大状态的作业(例如:天级窗口聚合)、需要开启HA的作业,对状态读写性能要求不高的作业、可以在生产场景使用 |
2.3.1 MemStateBackend
- 内存存储,即
MemoryStateBackend
,构造方法是设置最大的StateSize
,选择是否做异步快照; - 对于
State
状态存储在TaskManager
节点也就是执行节点内存中的,因为内存有容量限制,所以单个State maxStateSize
默认 5 M,且需要注意maxStateSize
<=akka.framesize
默认 10 M; - 对于
Checkpoint
存储在JobManager
内存中,因此总大小不超过JobManager
的内存; - 推荐使用的场景为:本地测试、几乎无状态的作业,比如
ETL
、JobManager
不容易挂,或挂掉影响不大的情况; - 不推荐在生产场景使用。
2.3.2 FastStateBackend
- 另一种就是在文件系统上的
FsStateBackend
构建方法是需要传一个文件路径和是否异步快照。 State
依然在TaskManager
内存中,但不会像MemoryStateBackend
是 5 M 的设置上限Checkpoint
存储在外部文件系统(本地或HDFS
),打破了总大小Jobmanager
内存的限制。- 推荐使用的场景为:常规使用状态的作业、例如分钟级窗口聚合或
join
、需要开启HA
的作业。 - 如果使用
HDFS
,则初始化FsStateBackend
时,需要传入以 “hdfs://
”开头的路径(即:new FsStateBackend("hdfs:///hacluster/checkpoint"))
, - 如果使用本地文件,则需要传入以“
file://
”开头的路径(即:new FsStateBackend("file:///Data"))
。 - 在分布式情况下,不推荐使用本地文件。因为如果某个算子在节点
A
上失败,在节点B
上恢复,使用本地文件时,在B
上无法读取节点A
上的数据,导致状态恢复失败。
2.3.3 RocksDBStateBackend
RocksDB
是一个key/value
的内存存储系统,和其他的key/value
一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中- 但需要注意
RocksDB
不支持同步的Checkpoint
,构造方法中没有同步快照这个选项。 - 不过
RocksDB
支持增量的Checkpoint
,意味着并不需要把所有sst
文件上传到Checkpoint
目录,仅需要上传新生成的sst
文件即可。 - 它的
Checkpoint
存储在外部文件系统(本地或HDFS
) - 其容量限制只要单个
TaskManager
上State
总量不超过它的内存+磁盘,单Key
最大2G
,总大小不超过配置的文件系统容量即可 - 推荐使用的场景为:超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业。
2.4 Checkpoint配置方式
2.4.1 全局配置
修改flink-conf.yaml
:
#这里可以配置 #jobmanager(即MemoryStateBackend), #filesystem(即FsStateBackend), #rocksdb(即RocksDBStateBackend) state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
2.4.2 代码配置
java代码:
//1.MemoryStateBackend--开发中不用 env.setStateBackend(new MemoryStateBackend) //2.FsStateBackend--开发中可以使用--适合一般状态--秒级/分钟级窗口... env.setStateBackend(new FsStateBackend("hdfs路径或测试时的本地路径")) //3.RocksDBStateBackend--开发中可以使用--适合超大状态--天级窗口... env.setStateBackend(new RocksDBStateBackend(filebackend, true))
注意RocksDBStateBackend
还需要引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.7.2</version> </dependency>
2.5 示例代码
/** * 演示Checkpoint参数设置(也就是Checkpoint执行流程中的步骤0相关的参数设置) * * @author : YangLinWei * @createTime: 2022/3/8 9:48 上午 */ public class CheckpointDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //===========Checkpoint参数设置==== //===========类型1:必须参数============= //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier! env.enableCheckpointing(1000); //设置State状态存储介质 /*if(args.length > 0){ env.setStateBackend(new FsStateBackend(args[0])); }else { env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp")); }*/ if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp")); } else { env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint")); } //===========类型2:建议参数=========== //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了) //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0 //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是 //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败 //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //===========类型3:直接使用默认的即可=============== //设置checkpoint的执行模式为EXACTLY_ONCE(默认) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。 env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟 //设置同一时间有多少个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1 //2.Source DataStream<String> linesDS = env.socketTextStream("node1", 9999); //3.Transformation //3.1切割出每个单词并直接记为1 DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //value就是每一行 String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 //注意:批处理的分组是groupBy,流处理的分组是keyBy KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0); //3.3聚合 DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1); DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() { @Override public String map(Tuple2<String, Integer> value) throws Exception { return value.f0 + ":::" + value.f1; } }); //4.sink result.print(); Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props); result.addSink(kafkaSink); //5.execute env.execute(); // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka } }
03 状态恢复和重启策略
3.1 自动重启策略和恢复
3.1.1 重启策略配置方式
如果在配置文件中,可以在flink-conf.yml
中可以进行配置,示例如下:
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
还可以在代码中针对该任务进行配置,示例如下:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重启次数 Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔 ))
3.1.2 重启策略分类
3.1.2.1 默认重启策略
如果配置了Checkpoint
,而没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启。
3.1.2.2 无重启策略
Job直接失败,不会尝试进行重启
配置文件的方式:
restart-strategy: none
代码配置的方式:
val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.noRestart())
3.1.2.3 固定延迟重启策略
重启策略可以配置flink-conf.yaml
的下面配置参数来启用,作为默认的重启策略:
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
也可以在程序中设置:
// 表示:如果job失败,重启3次, 每次间隔10 val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 最多重启3次数 Time.of(10, TimeUnit.SECONDS) // 重启时间间隔 ))
3.1.2.4 失败率重启策略
失败率重启策略可以在flink-conf.yaml
中设置下面的配置参数来启用:
restart-strategy:failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s
失败率重启策略也可以在程序中设置:
//表示:如果5分钟内job失败不超过三次,自动重启, //每次间隔10s (如果5分钟内程序失败超过3次,则程序退出) val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每个测量时间间隔最大失败次数 Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔 Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔 ))
3.1.2.5 示例代码
/** * 演示Checkpoint+重启策略 * * @author : YangLinWei * @createTime: 2022/3/8 10:01 上午 */ public class CheckpointDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //===========Checkpoint参数设置==== //===========类型1:必须参数============= //设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier! env.enableCheckpointing(1000); //设置State状态存储介质 /*if(args.length > 0){ env.setStateBackend(new FsStateBackend(args[0])); }else { env.setStateBackend(new FsStateBackend("file:///D:/ckp")); }*/ if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend("file:///D:/ckp")); } else { env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint")); } //===========类型2:建议参数=========== //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了) //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0 //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是 //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败 //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //===========类型3:直接使用默认的即可=============== //设置checkpoint的执行模式为EXACTLY_ONCE(默认) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。 env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟 //设置同一时间有多少个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1 //=============重启策略=========== //-1.默认策略:配置了Checkpoint而没有配置重启策略默认使用无限重启 //-2.配置无重启策略 //env.setRestartStrategy(RestartStrategies.noRestart()); //-3.固定延迟重启策略--开发中使用! //重启3次,每次间隔10s /*env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, //尝试重启3次 Time.of(10, TimeUnit.SECONDS))//每次重启间隔10s );*/ //-4.失败率重启--偶尔使用 //5分钟内重启3次(第3次不包括,也就是最多重启2次),每次间隔10s /*env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每个测量时间间隔最大失败次数 Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔 Time.of(10, TimeUnit.SECONDS) // 每次重启的时间间隔 ));*/ //上面的能看懂就行,开发中使用下面的代码即可 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); //2.Source DataStream<String> linesDS = env.socketTextStream("node1", 9999); //3.Transformation //3.1切割出每个单词并直接记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //value就是每一行 String[] words = value.split(" "); for (String word : words) { if (word.equals("bug")) { System.out.println("手动模拟的bug..."); throw new RuntimeException("手动模拟的bug..."); } out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 //注意:批处理的分组是groupBy,流处理的分组是keyBy KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.sink result.print(); //5.execute env.execute(); } }
3.2 手动重启并恢复
1.把程序打包
2.启动Flink集群(本地单机版,集群版都可以)
/export/server/flink/bin/start-cluster.sh
3.访问webUI
4.使用FlinkWebUI提交
5.取消任务
6.重新启动任务并指定从哪恢复(例如:hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34
)
7.关闭/取消任务
8.关闭集群
/export/server/flink/bin/stop-cluster.sh
04 Savepoint
4.1 Savepoint介绍
保存点(Savepoint):类似于以前玩游戏的时候,遇到难关了/遇到boss了,赶紧手动存个档,然后接着玩,如果失败了,赶紧从上次的存档中恢复,然后接着玩。
在实际开发中,可能会遇到这样的情况:如要对集群进行停机维护/扩容…
那么这时候需要执行一次Savepoint
也就是执行一次手动的Checkpoint
/也就是手动的发一个barrier
栅栏,那么这样的话,程序的所有状态都会被执行快照并保存。
当维护/扩容完毕之后,可以从上一次Savepoint
的目录中进行恢复!
4.2 Savepoint VS Checkpoint
功能 | Checkpoint | Savepoint |
触发管理方式 | 由Flink自动触发并管理 | 由用户手动触发并管理 |
主要用途 | 在Task发生异常时快速恢复(例如网络抖动导致的超时异常) | 有计划地机型备份,使作业能停止后回复(例如修改代码,调整并发) |
特点 | 轻量、自动从故障中恢复、在作业停止后默认清除 | 持久、以标准格式存储,允许代码或配置发生改变、手动触发从Savepoint的恢复 |
目标 | 任务失败的恢复/故障转移机制 | 手动备份/重启/回复任务 |
实现 | 轻量快速 | 注重可移植性,成本较高 |
生命周期 | Flink自身控制 | 用户手动控制 |
4.3 Savepoint 案例演示
# 启动yarn session /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d # 运行job-会自动执行Checkpoint /export/server/flink/bin/flink run --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar # 手动创建savepoint--相当于手动做了一次Checkpoint /export/server/flink/bin/flink savepoint 702b872ef80f08854c946a544f2ee1a5 hdfs://node1:8020/flink-checkpoint/savepoint/ # 停止job /export/server/flink/bin/flink cancel 702b872ef80f08854c946a544f2ee1a5 # 重新启动job,手动加载savepoint数据 /export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint/savepoint/savepoint-702b87-0a11b997fa70 --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar # 停止yarn session yarn application -kill application_1607782486484_0014
05 文末
本文主要讲解了Flink
高级API
里面的容错机制,谢谢大家的阅读,本文完!