开篇
本文很长,请耐心看完。另外,由于直接放源码很是影响手机阅读体验,我把源码都截成图片了。
大部分文章只是说明每个模块的职责和功能,考虑到如果只是单纯说明,读者还是很难把整体的流程连接起来。因此我打算从这个例子一步步解析。
就直接从开头 hystrix.ConfigureCommand 开始吧。
上篇文章提过,这个操作主要是为每个 commandName 自定义自己的规则配置。如果未自定义,那么会使用默认值。 最终会把配置值存储在 circuitSettings 这个 map 类型中,它的初始化操作是在 init() 执行的。
接下来执行 hystrix.Do, Do 是一个同步的操作,它会阻塞等待,直到执行函数结束或者熔断器返回错误,如:断路器开启、超出最大并发数。
此函数需要三个参数,第一个参数表示 commandName 的名称,第二个参数就是正常的业务的匿名函数,比如在函数中进行外部服务调用。如果调用失败,那么就会执行第三个参数的操作,我们可以称之为保底操作,当熔断器开启的时候,系统也是会直接调用此函数。这两个参数的类型分别是匿名函数和闭包函数。
从图中可以看出,Do 函数只是把传入的后两个参数进一步封装成函数。然后调用 DoC。
Doc 函数第一个参数是上下文 context.Context。context.Context 一般出现在不同 Goroutine 之间同步指定数据。如果你使用过 gin 框架,经常和它打交道。
第三和第四的参数即 Do 中进一步包装的两个闭包函数。所谓闭包,我的理解是:存在自由的变量。这个自由的变量取决于运行闭包函数时的环境,在 DoC 中,runFunc 和 fallbackFuncC 类型
也就是说这两个闭包的自由变量是 context.Context。
Doc 函数中 变量 r 和 f 不再解释。由于我们在调用 Do 函数时传递了第三个参数,因此执行 errChan = GoC(ctx, name, r, f)。最下面使用 select 可以监控多 channel。当某个 channel 有数据时,从其中读取。我们接着往下看 GoC。
Goc 是核心代码块。它是真正执行你的核心业务函数的地方。我先大体介绍一些这个函数核心功能。
它会先去验证一些规则,比如判断熔断器是否开启,决定当前是否可以执行你的业务。判断是否可以获取访问令牌。如果可以,执行你的业务逻辑,成功了上报成功的状态,失败了,除了上报状态,如果传入了异常处理的函数,那么执行异常处理的函数。另外还有一些归还令牌等操作。我们来看代码。
这段代码就不解释了吧。但是我们可以来看看 command 结构体中还有啥参数。
command 里面,关键的两个字段是 events 和 circuit。其实 events 主要是存储事件类型信息,比如执行成功的 success,或者失败的 timeout、context_canceled 等。 circuit 是指针类型 CircuitBreaker, CircuitBreaker 就是真正的熔断器 。command 主要是记录单个执行的状态以及和熔断器进行一些运行交互。交互什么?主要向 CircuitBreaker 上报执行状态事件。
接下来看下面的代码
GetCircuit(name),从函数名就知道是通过名称获取一个熔断器。
这个函数代码很清晰,如果没有从 circuitBreakers 中查询到对应的 CircuitBreaker,那么就创建一个。
这里的代码有点小细节。首先 circuitBreakers 是 map 类型,我们都知道 map 并不是并发安全的。所以在查找的的时候加了读锁。如果没找到值,那么解除读锁。尝试获取写锁,我们在写锁里面,又进一步确认是否存在 circuitBreakers,为什么需要这样操作?这是因为在我们释放读锁到获取写锁过程中有可能存在其他的 Goroutine 抢先一步创建。所以这里需要进一步确认,此时不存在,那就真的不存在,通过 name 生成一个 circuitBreakers。具体看下 newCircuitBreaker(name) 函数。
主要是初始化创建一个熔断器 CircuitBreaker 操作,我们可以看看 CircuitBreaker 都有哪些字段。
主要说明几个字段,open 表示当前熔断器是否开启,executorPool 是流量控制中心,所有的请求都需要先获取到令牌。metrics 的类型是 *metricExchange, 可以看成是上报执行状态事件的载体。通过它把执行状态信息存储到实际熔断器执行各个维度状态 (成功次数,失败次数,超时……) 的数据集合中。
newCircuitBreaker(name) 初始化的同时也初始化了 executorPool 和 metrics。
先看 newMetricExchange(name)。看看它 metricExchange 结构
Updates 是一个 channel 类型,通过 Updates 上报执行事件集合。metricCollectors 存储的是 metricCollector.MetricCollector 切片,而 metricCollector.MetricCollector 是一个接口类型。
newMetricExchange(name) 中,
可以看到,初始化 Updates 通道的的容量是 2000。初始化 metricCollectors 主要逻辑在 InitializeMetricCollectors。
关键的地方我标明了。再看看 newDefaultMetricCollector
此函数返回一个 MetricCollector 类型,结构体 DefaultMetricCollector 实现了 MetricCollector 所有方法。再看看 DefaultMetricCollector,不正是存储熔断器执行状态所有信息嘛。