前提
笔者之前在查找Sentinel相关资料的时候,偶然中找到了Martin Fowler
大神的一篇文章《CircuitBreaker》。于是花了点时间仔细阅读,顺便温习一下断路器CircuitBreaker
的原理与实现。
CircuitBreaker的原理
现实生活中的熔断器(更多时候被称为保险丝)是一种安装在电路中用于保证电路安全运行的电子元件。它的外形一般是一个绝缘的玻璃容器包裹着一段固定大小电阻和固定熔点的纤细合金导体,如下图:
电路中,保险丝会和其他用电的原件串联,根据物理公式Q = I^2*R*T
(Q
为热能值,也理解为保险丝熔断的极限热能值,I
为电流中的电流,R
为保险丝固定电阻,T
为时间),如果电路中有其他用电的原件短路,会导致电流I
十分大,导致在T
很小的情况下,计算得到的Q
远大于保险丝熔断的极限热能值,保险丝就会被击穿熔断。这个时候整个电路处于断开状态,从而避免电路过载导致电路中的用电原件损毁。
电路中的电流过大会导致所有电阻比较大的电器发生大量积热,很容易出现火灾,所以保险丝在过去曾经起到巨大的作用。后来出现了更加先进的"空气开关",漏电开关多数都升级为此实现,保险丝依然会应用在各种的原件中,但是几乎淡出了日常生活触及的视线范围。
记得小时候某个傍晚爷爷拉开了白炽灯,啪的一声整个屋子的电器都停了,突然停电了。他说了句:保险丝"烧"了,等我换一条。换上保险丝把总闸门打上去后供电恢复。
从上面的分析可见:现实中的熔断器是一次性使用的消耗品,而且有些场景下需要人为干预(更换)。
软件系统中的CircuitBreaker
在设计上是借鉴了现实生活中熔断器的功能并且做出改良而诞生的一种模式。这个模式出现的背景是:随着软件和计算机网络的发展,以及当前微服务架构的普及,应用会部署在不同的计算机上或者同一台计算机的不同进程上,那么需要通过远程调用进行交互。远程调用和单进程的内存态调用的最大区别之一是:远程调用有可能因为各种原因出现调用失败、没有响应的挂起(其实就是无超时期限的等待)或者直到某个超时的期限才返回结果。这些故障会导致调用方的资源被一直占用无法释放(最常见的就是调用方接收请求或者处理请求的线程被长时间挂起):
如果发生故障的被调用方节点刚好是关键节点,并且此故障节点的上游调用者比较多(例如上图中的内部网关),那么级联故障会蔓延,极端情况下甚至会耗尽了整个服务集群中的所有资源。如果在服务的远程调用加入了CircuitBreaker
组件,那么单个服务调用的效果如下:
断路器CircuitBreaker
的基本原理比较简单:将受保护的函数(方法)包装在断路器对象中进行调用,断路器对象将会监视所有的调用相关的数据(主要是统计维度的数据,一般方法参数可以过滤)。一旦出现故障的调用达到了某个阈值或者触发了某些规则,断路器就会切换为Open
状态,所有经由断路器的调用都会快速失败,请求不会到达下游被调用方。笔者认为从实际来看,CircuitBreaker
的核心功能就是三大块:
- 调用数据度量统计。
- 维护断路器自身的状态。
- 基于前两点保护包裹在断路器中执行的调用。
基于调用数据的度量统计一般会引入JDK8
中的原子(Atomic
)类型。下游被调用方不会一直处于故障,为了断路器能够自恢复,引入了Half_Open
状态和滑动窗口的概念。同时,考虑到程序容器的线程阻塞带来的毁灭性影响,有时候可以考虑进行如下优化:断路器把受保护的调用基于定义好的资源标识选择特定的线程池(或者信号量)进行调用,充分利用FutureTask#get(long timeout, TimeUnit unit)
设定调用任务最大超时期限的优势,这样就能基本避免出现故障的远程调用阻塞了本应用容器的线程。
这里的容器是特指Tomcat、Netty、Jetty等。而这里提到的线程池隔离、滑动窗口等概念会在下文具体实现的时候再详细展开。
基于线程池隔离:
直接基于容器线程隔离:
CircuitBreaker的简易实现
这一小节会按照上一小节的理论,设计多种CircuitBreaker
的实现,由简单到复杂一步一步进行迭代。CircuitBreaker
的状态转换设计图如下:
基于此设计图,Martin Fowler
大神在其文章中也给予了伪代码如下:
class ResetCircuitBreaker... // 初始化 def initialize &block @circuit = block @invocation_timeout = 0.01 @failure_threshold = 5 @monitor = BreakerMonitor.new @reset_timeout = 0.1 reset end // 重置 def reset @failure_count = 0 @last_failure_time = nil @monitor.alert :reset_circuit end // 状态维护 def state case when (@failure_count >= @failure_threshold) && (Time.now - @last_failure_time) > @reset_timeout :half_open when (@failure_count >= @failure_threshold) :open else :closed end end // 调用 def call args case state when :closed, :half_open begin do_call args // 这里从描述来看应该是漏了调用reset方法 // reset rescue Timeout::Error record_failure raise $! end when :open raise CircuitBreaker::Open else raise "Unreachable" end end // 记录失败 def record_failure @failure_count += 1 @last_failure_time = Time.now @monitor.alert(:open_circuit) if :open == state end 复制代码
下面的多种实现的思路都是基于此伪代码的基本框架进行编写。
基于异常阈值不会自恢复的实现
这种实现最简单,也就是只需要维护状态Closed
转向Open
的临界条件即可,可以设定一个异常计数的阈值,然后使用一个原子计数器统计异常数量即可,Java
代码实现如下:
// 断路器状态 public enum CircuitBreakerStatus { /** * 关闭 */ CLOSED, /** * 开启 */ OPEN, /** * 半开启 */ HALF_OPEN } @Getter public class SimpleCircuitBreaker { private final long failureThreshold; private final LongAdder failureCounter; private final LongAdder callCounter; private CircuitBreakerStatus status; public SimpleCircuitBreaker(long failureThreshold) { this.failureThreshold = failureThreshold; this.callCounter = new LongAdder(); this.failureCounter = new LongAdder(); this.status = CircuitBreakerStatus.CLOSED; } private final Object fallback = null; @SuppressWarnings("unchecked") public <T> T call(Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this.status) { return supplier.get(); } } catch (Exception e) { this.failureCounter.increment(); tryChangingStatus(); } finally { this.callCounter.increment(); } return (T) fallback; } private void tryChangingStatus() { if (this.failureThreshold <= this.failureCounter.sum()) { this.status = CircuitBreakerStatus.OPEN; System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]", CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } public void call(Runnable runnable) { call(() -> { runnable.run(); return null; }); } } 复制代码
在多线程调用的前提下,如果在很短时间内有大量的线程中的方法调用出现异常,有可能所有调用都会涌进去tryChangingStatus()
方法,这种情况下会导致CircuitBreaker
的状态被并发修改,可以考虑使用AtomicReference
包裹CircuitBreakerStatus
,做CAS
更新(确保只更新一次)即可。变更的代码如下:
private final AtomicReference<CircuitBreakerStatus> status; public SimpleCircuitBreaker(long failureThreshold) { ...... this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED); } public <T> T call(Supplier<T> supplier) { try { if (CircuitBreakerStatus.CLOSED == this.status.get()) { return supplier.get(); } ...... private void tryChangingStatus() { if (this.failureThreshold <= this.failureCounter.sum()) { boolean b = this.status.compareAndSet(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN); if (b) { System.out.println(String.format("SimpleCircuitBreaker状态转换,[%s]->[%s]", CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)); } } } 复制代码
并发极高的场景下假设出现调用异常前提下,异常计数器failureCounter
的计数值有可能在一瞬间就远超过了异常阈值failureCounter
,但是一般不考虑对这些计数值的比较或者状态切换的准确时机添加同步机制(例如加锁),因为一旦加入同步机制会大大降低并发性能,这样引入断路器反而成为了性能隐患,显然是不合理的。所以一般设计断路器逻辑的时候,并不需要控制断路器状态切换的具体计数值临界点,保证状态一定切换正常即可。基于此简陋断路器编写一个同步调用的测试例子:
public static class Service { public String process(int i) { System.out.println("进入process方法,number:" + i); throw new RuntimeException(String.valueOf(i)); } } public static void main(String[] args) throws Exception { SimpleCircuitBreaker circuitBreaker = new SimpleCircuitBreaker(5L); Service service = new Service(); for (int i = 0; i < 10; i++) { int temp = i; String result = circuitBreaker.call(() -> service.process(temp)); System.out.println(String.format("返回结果:%s,number:%d", result, temp)); } } 复制代码
测试结果输出如下:
进入process方法,number:0 返回结果:null,number:0 进入process方法,number:1 返回结果:null,number:1 进入process方法,number:2 返回结果:null,number:2 进入process方法,number:3 返回结果:null,number:3 进入process方法,number:4 SimpleCircuitBreaker状态转换,[CLOSED]->[OPEN] 返回结果:null,number:4 返回结果:null,number:5 返回结果:null,number:6 返回结果:null,number:7 返回结果:null,number:8 返回结果:null,number:9 复制代码
细心的伙伴会发现,基本上状态的维护和变更和数据统计都位于调用异常或者失败的方法入口以及最后的finally代码块,在真实的调用逻辑前一般只会做状态判断或者下文提到的分配调用资源等。