我们在项目中添加 Spring Cloud Sentinel 依赖添加后 spring-cloud-starter-alibaba-sentinel
在 Spring-Boot 启动的过程中回去初始化 spring.factories
中的配置信息,如:SentinelWebAutoConfiguration
、SentinelAutoConfiguration
等配置文件来初始化
再讲代码之前我先声明一下我的版本号sentinel 1.8.0
。后续的所有内容均基于该版本进行
@ResoureSetinel 工作原理
配置流控规则我们最简单的方式就是通过 @ResoureSetinel 的方式来管理,该注解可以直接定义流控规则、降级规则。下面是一个简单的使用例子:
@SentinelResource(value = "ResOrderGet", fallback = "fallback", fallbackClass = SentinelResourceExceptionHandler.class, blockHandler = "blockHandler", blockHandlerClass = SentinelResourceExceptionHandler.class )@GetMapping("/order/get/{id}")public CommonResult<StockModel> getStockDetails(@PathVariable Integer id) { StockModel stockModel = new StockModel(); stockModel.setCode("STOCK==>1000"); stockModel.setId(id); return CommonResult.success(stockModel);}
如果大家熟悉 Spring 相关的组件大家都可以想到,这里多半是通过Spring Aop. 的方式来拦截 getStockDetails
方法。我们先看看SentinelAutoConfiguration
配置文件,我们可以找到 SentinelResourceAspect
Bean 的定义方法。
@Bean@ConditionalOnMissingBeanpublic SentinelResourceAspect sentinelResourceAspect() { return new SentinelResourceAspect();}
让后我们再来看看 SentinelResourceAspect 具体是怎么处理的,源码如下:
// 定义 Pointcut@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")public void sentinelResourceAnnotationPointcut() {}// Around 来对被标记 @SentinelResource 注解的方法进行处理@Around("sentinelResourceAnnotationPointcut()")public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); // 获取注解信息 SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); // 获取资源名称 String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 执行 entry entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); // 执行业务方法 Object result = pjp.proceed(); // 返回 return result; } catch (BlockException ex) { // 处理 BlockException return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); // 处理降级 return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; }}
我们总结一下, @SentinelResource 的执行过程, 首先是通过 Aop 进行拦截,然后通过 SphU.entry
执行对应的流控规则,最后调用业务方法。如果触发流控规则首先处理流控异常 BlockException
然后在判断是否有服务降级的处理,如果有就调用 fallback
方法。通过 handleBlockException
、handleFallback
进行处理。
责任链模式处理流控
通过上面的梳理,我们知道对于流控的过程,核心处理方法就是 SphU.entry
。在这个方法中其实主要就是初始化流控 Solt 和执行 Solt. 在这个过程中会对:簇点定义、流量控制、熔断降级、系统白名单等页面功能进行处理。
1. 初始化责任链
下面是初始化 Solt 的核心代码在 SphU.entryWithPriority
// 删减部分代码private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 初始化责任链 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); try { // 执行 entry chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); // 异常抛出,让 SentinelResourceAspect.invokeResourceWithSentinel 统一处理 throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e;}
通过 lookProcessChain
方法我逐步的查找,我们可以看到最终的责任链初始化类,默认是 DefaultSlotChainBuilder
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // Note: the instances of ProcessorSlot should be different, since they are not stateless. // 通过 SPI 去加载所有的 ProcessorSlot 实现,通过 Order 排序 List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // 添加到 chain 尾部 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; }}
2. 责任链的处理过程
我们可以通过断点的方式来查看在 sortedSlotList 集合中所有的 solt
顺序如下图所示:
publicProcessorslotchainbui)
37
ProccssorstotchaanGautProcssorochanhatcrohan
38
39
Note:theinstancssofProcesorstotshoudedferentsnethette
40
ListProcessorsiotsorteiotitS
41
sortedslotlist)sortedsotList:
foR(Processorslotslot
9
42
siZe
if(!(slotinstanceofAbstractLinkedProcessorsot))
43
")isnot
RecordLog.arnl"TheProcessorstot+soe
44
45
continve
人
46
47
chain.addLast((AbstractLinkedProcessorslot)s
48
DefaultslotchainBuilder?buildo
StockApplication-Node1x
pplication
与美
土n
马
Vartables
thisgDefaultslotChainBuilder@8013)
个
ain":RUNNING
chain-IDefaultProcessorlotchain@80191
a.csp.sendinel.s/ots)
sortedSlotListArrayList@8o2o)size
aba.csp.seninel.s/otchain)
0-NodeSelectorslot@8037)
sp.sentinel
三1(ClusterBuilderslot@8038)
o.sentine/
三2LogSlot@8039
sentinel
3-`Statisticslot@8040)
sentinel
00
三4-(AuthoritySlot@8041)
三5-`Systemslot@8042)
sentinel.adapter.servlet
三6-ParamFlowslot@8043)
g.apache.catalina.core)
7(FIowSlot@8044)
he.catalina.core)
三8(DegradeSlot@8045]
.springtramewark.web.filter)
tramework.web.ailter)
g.apache.catalina.core
我们可以通过如下的顺序进行逐个的简单的分析一下
- NodeSelectorSolt
- CusterBuilderSolt
- LogSlot
- StatisicSlot
- AuthoritySolt
- SystemSolts
- ParamFlowSolt
- FlowSolt
- DegradeSlot
对于 Sentinel 的 Slot 流控协作流程可以参考官方给出的文档, 如下图所示:
Slotchain
TreeNodeBuilder
ClusterNodeBuilder
Statisticslot
Tree
Timestamp:1527574049
ClusterNode
Node
Rt:20
Thread:1
Root
node
Qps:5
Exception:0
月
Success1
红
2
entry
entry
entry
nil
node
t
3
2
Timestamp:1527574449
TI
Rt:18
Thread:0
o
default
Defaul
node
Qps:5
ode
tnode
Exception:0
3
0o
Success:2
Runtimestatistics
Action
Flowslot
Systemslot
Authorizeslot
Degradeslot
rules
FlowSolt 流控
通过 NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的请求数据处理,在 FlowSolt 会进入流控规则,所有的 Solt 都会执行 entry 方法, 如下所示
// FlowSolt 的 entry 方法@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 检查流量 checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args);}
在后续的流程中,会执进行判断具体的流控策略,默认是快速失败,会执行 DefaultController
方法。
// DefaultController@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) { // 当前资源的调用次数 int curCount = avgUsedTokens(node); // 当前资源的调用次数 + 1 > 当前阈值 if (curCount + acquireCount > count) { // 删减比分代码 // 不通过 return false; } // 通过 return true;}private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
如果上面返回不通过会回到,那么会抛出 FlowException
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { // 流控规则不通过,会抛出 FlowException throw new FlowException(rule.getLimitApp(), rule); } } }}
然后会在 StatisticSlot
中增加统计信息, 最后会抛出给 SentinelResourceAspect
进行处理,完成流控功能。我们再来看看这个异常信息,如果是BlockException
异常,会进入 handleBlockException
方法处理, 如果是其他的业务异常首先会判断是否有配置 fallback 处理如果有,就调用 handleFallback
没有就继续往外抛,至此完成流控功能
try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); Object result = pjp.proceed(); return result;} catch (BlockException ex) { return handleBlockException(pjp, annotation, ex);} catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex;}
DegradeSlot 降级
断路器的作用是当某些资源一直出现故障时,将触发断路器。断路器不会继续访问已经发生故障的资源,而是拦截请求并返回故障信号。
Sentinel 在 DegradeSlot
这个 Slot 中实现了熔断降级的功能,它有三个状态 OPEN
、HALF_OPEN
、CLOSED
以ResponseTimeCircuitBreaker
RT 响应时间维度来分析, 断路器工作的过程。下面是一个标准断路器的工作流程:
Closed
Open
HalfOpen
在 Sentinel 实现的源码过程如下图所示:
DegradeSlot.entry
ExceptionCircuitBreaker
异常次数
cb.tryPass(c
慢调用比率
ResponseTimeCircuitBreaker
执行判断
ontext)
performChecking
断路关闭
currentState.geto
StatE.CLOSED
是否进入下次重试
retryTimeoutArrived0&&
执行业务方法
fromopenToHalfOpen(context)
打开到半开,最多放行一个请求
触发熔断,触发DegradeException
entry.exit(.pip.getargso)
熔断处理
ExceptionCircuitBreaker
DegradeSlot.exit
ResponseTimeCircuitBreaker
异常数
爱调用比率
调用次数加
circuitBreaker.onRequestComplete(context);
counter.totalCount.ada(1):
处理断路器状态
handleStateChangewhenThresholdExceeded(rt)
打开状态
currentState.getO)State.OPEN
半开状态
SlowCount+counterslowCount.sum0;
totalCount+counter.totaicount.sum0;
关闭状态
cucrentState.getO
State.HALFOPEN
请求统计
是否大于最大响应时间
是否大于慢调用比率
代t>maxAllowedRt
currentRatio>maxSlowRequestRatio
是
fromHalfOpenToclose0;
transformToOpen(currentRatiol;
fromHalfOpenToOpen(1.0d);
关闭断路器
继续打开
打开断路器
设置下次重试时间
重置计数
resetStat0:
updateNextRetryTimestampl;
结束
Sentinel 通过 Web 拦截器
Sentinel 在默认情况下, 不使用 @ResourceSentinel 注解实现流控的时候, Sentinel 通过拦截器进行流控实现的。初始化类在 SentinelWebAutoConfiguration
它实现了 WebMvcConfigurer
接口,在 sentinelWebInterceptor
方法初始化 SentinelWebInterceptor
等 Bean。
@Bean@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)public SentinelWebInterceptor sentinelWebInterceptor( SentinelWebMvcConfig sentinelWebMvcConfig) { return new SentinelWebInterceptor(sentinelWebMvcConfig);}
我们在 SentinelWebInterceptor
的核心方法 preHandle
中处理,这里面我们又可以看到 SphU.entry
熟悉的过程调用流控的责任链。由于逻辑都类似,此处不再多说。代码如下:
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { try { String resourceName = getResourceName(request); if (StringUtil.isEmpty(resourceName)) { return true; } if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) { return true; } // Parse the request origin using registered origin parser. String origin = parseOrigin(request); String contextName = getContextName(request); ContextUtil.enter(contextName, origin); Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN); request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry); return true; } catch (BlockException e) { try { handleBlockException(request, response, e); } finally { ContextUtil.exit(); } return false; }}