- 1min内,异常数量超过50%,触发熔断
- 1s 5个请求,平均响应时间超过一个阈值(1000ms)
- 1min内,超过了多少个异常数量
public class DataSourceInitFunc implements InitFunc{ public void init() throws Exception{ List<DegradeRule> rules = new ArrayList<>(); DegradeRule rule = new DegradeRule(); rule.setResource("com.gupaodu.springcloud.dubbo.ISayHelloService");// 表示针对那个服务或者方法的熔断 // 针对这个类下面所有的接口进行资源的判断,当这个类下面任何一个方法触发了熔断,那么调用这个类下面的任何一个接口都会自动触发降级。 // 设置模式 rule.setGrade(RuleConstant.Degrade.DEGRADE_CRADE_EXCEPTION_COUNT)// 错误数 超时时间 错误率 指标 rule.setCount(3);// 阈值 rule.setTimeWindow(100)// 时间窗口 100s rules.add(rule); DegradeRuleManager.loadRules(rules); } } public interface ISayHelloService{ String sayHello(String msg); String exceptionTest(); } @DubboService public class SayHelloServiceImpl implement ISayHelloService{ public String sayHello(String msg){ return ""; } // 用来触发测试 public String exceptionTest(){ throw new RuntimeException("biz exception"); } } @RestController public class SentinelController { @DubboReference(mock="......SayHelloServiceMock") ISayHelloService sayHelloService; @GetMapping("/say") public String say(){ return sayHelloService.sayHello(""); } @GetMapping("/exception") public String exception(){ return sayHelloService.exceptionTest(); } } public class SayHelloServiceMock implements ISayHelloService{ public String sayHello(String msg){ return "触发了降级,返回默认数据"; } // 用来触发测试 public String exceptionTest(){ return "触发了降级,返回默认数据" } }
其实本质就是 当触发熔断之后,该接口下其他方法在时间窗口内 也不能访问,只能被降级。
- 当前的qps
- 当前的总的并发数线程数
- 当前失败请求数是多少?
- …
此图为官方提供的图,一个请求过来,它会在 调用链路构建处 构建一个 Invocation Tree结构,也就是将我们请求的资源构建成一个节点,以树形的方式去构建。然后就是针对某个节点进行监控统计,主要是以环形数组的方式哦存,其实也就是每一个node维护了一个滑动窗口,其统计了针对这个资源总的信息去进行指标统计,然后通过一个判断责任链去鉴别触发了那些规则。
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException { return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0); } public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { return this.entryWithType(name, resourceType, entryType, count, false, args); } public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); return this.entryWithPriority(resource, count, prioritized, args); } private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 获取线程上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { return new CtEntry(resourceWrapper, (ProcessorSlot)null, context); } else { if (context == null) { context = CtSph.InternalContextUtil.internalEnter("sentinel_default_context"); } // 是否开启或关闭流控 if (!Constants.ON) { return new CtEntry(resourceWrapper, (ProcessorSlot)null, context); } else { // 构建了一个调用链 ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper); ------------------------------------------------------------------------------ ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper); if (chain == null) { synchronized(LOCK) { chain = (ProcessorSlotChain)chainMap.get(resourceWrapper); if (chain == null) { // 超过阈值就直接返回null if (chainMap.size() >= 6000) { return null; } // 单例模式创建调用链 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; // 如果直接添加进去的话,涉及到扩容,所以采用创建之后赋值的方法 } } } return chain; } public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } else { slotChainBuilder = (SlotChainBuilder)SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); if (slotChainBuilder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default", new Object[0]); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", new Object[]{slotChainBuilder.getClass().getCanonicalName()}); } return slotChainBuilder.build(); } } public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); Iterator var3 = sortedSlotList.iterator(); // 然后在这里进行一个遍历 while(var3.hasNext()) { ProcessorSlot slot = (ProcessorSlot)var3.next(); if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]); } else { chain.addLast((AbstractLinkedProcessorSlot)slot); } } return chain; } // 在这里面涉及到了SpiLoader // 其实也就是在这个链路中,可以进入到我们自己的 自定义的slot 中
if (chain == null) { return new CtEntry(resourceWrapper, (ProcessorSlot)null, context); } else { CtEntry e = new CtEntry(resourceWrapper, chain, context); try { // 构建好以后然后通过链式去处理 chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args); } catch (BlockException var9) { e.exit(count, args); throw var9; } catch (Throwable var10) { RecordLog.info("Sentinel unexpected exception", var10); } return e; } } } }
前面传进来的资源包装成一个 resourceWrapper对象
当我们打算进入 chain.entry 的时候,发现其实就是一系列的slot
树的构建 NodeSelectorSlot
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = (DefaultNode)this.map.get(context.getName()); // 双重检查锁 + 缓存 if (node == null) { synchronized(this) { node = (DefaultNode)this.map.get(context.getName()); if (node == null) { node = new DefaultNode(resourceWrapper, (ClusterNode)null); HashMap<String, DefaultNode> cacheMap = new HashMap(this.map.size()); cacheMap.putAll(this.map); cacheMap.put(context.getName(), node); this.map = cacheMap; ((DefaultNode)context.getLastNode()).addChild(node); } } } context.setCurNode(node); // 当上述构建操作完成后 // 释放entry this.fireEntry(context, resourceWrapper, node, count, prioritized, args); } // 释放的本质其实就是执行下一个 public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (this.next != null) { this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } }
统计监控 StatisticSlot
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { Iterator var8; ProcessorSlotEntryCallback handler; try { // 先交给后续的slot进行处理 this.fireEntry(context, resourceWrapper, node, count, prioritized, args); // 然后开始统计资源 // node代表当前传过来的资源 node.increaseThreadNum(); node.addPassRequest(count); ----------------------------------------------------------------------------------- public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } public void addPassRequest(int count) { this.rollingCounterInSecond.addPass(count); // 秒级别的统计 this.rollingCounterInMinute.addPass(count); // 分钟级别的统计 } this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); SampleCountProperty.SAMPLE_COUNT = 2; IntervalProperty.INTERVAL = 1000; // 上面有点像滑动窗口的概念 public void addPass(int count) { WindowWrap<MetricBucket> wrap = this.data.currentWindow(); // 根据某一个指标加进去 这里面的本质就是通过当前的时间去得到一个窗口,将其加进去 // LongAdder的思想 其底层就是数组累加的思想 !!! ((MetricBucket)wrap.value()).addPass(count); } // 当最终计数完成后,相当于形成了一开始的那个图的样子 private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs); } // super(sampleCount, intervalInMs); public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; // 每个窗口的时间长度 this.intervalInMs = intervalInMs; this.intervalInSecond = (double)intervalInMs / 1000.0D; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray(sampleCount); // sampleCount = 2 // 也就代表着窗口的大小是两个 } protected final AtomicReferenceArray<WindowWrap<T>> array;
if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var13.hasNext()) { ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next(); handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException var10) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); } var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var8.hasNext()) { handler = (ProcessorSlotEntryCallback)var8.next(); handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException var11) { BlockException e = var11; context.getCurEntry().setBlockError(var11); node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseBlockQps(count); } var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var8.hasNext()) { handler = (ProcessorSlotEntryCallback)var8.next(); handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable var12) { context.getCurEntry().setError(var12); throw var12; } }
流量控制 FlowSlot
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { this.checkFlow(resourceWrapper, context, node, count, prioritized); this.fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { this.checker.checkFlow(this.ruleProvider, resource, context, node, count, prioritized); } public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider != null && resource != null) { // 获取针对这个资源的限流规则 Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName()); if (rules != null) { Iterator var8 = rules.iterator(); while(var8.hasNext()) { FlowRule rule = (FlowRule)var8.next(); if (!this.canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } } public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } else { return rule.isClusterMode() ? passClusterCheck(rule, context, node, acquireCount, prioritized) : passLocalCheck(rule, context, node, acquireCount, prioritized); } } private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); // 在这里再根据不同的策略去决定拒绝的要怎么办 return selectedNode == null ? true : rule.getRater().canPass(selectedNode, acquireCount, prioritized); } static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) { String limitApp = rule.getLimitApp(); int strategy = rule.getStrategy(); String origin = context.getOrigin(); // 根据不同的来源进行限流 if (limitApp.equals(origin) && filterOrigin(origin)) { return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node); } else if ("default".equals(limitApp)) { return (Node)(strategy == 0 ? node.getClusterNode() : selectReferenceNode(rule, context, node)); } else if ("other".equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node); } else { return null; } } public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 根据资源获取当前的使用量 int curCount = this.avgUsedTokens(node); // 如果当前使用量 + 请求 > 阈值 则按照策略进行限流 if ((double)(curCount + acquireCount) > this.count) { if (prioritized && this.grade == 1) { long currentTime = TimeUtil.currentTimeMillis(); long waitInMs = node.tryOccupyNext(currentTime, acquireCount, this.count); if (waitInMs < (long)OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); this.sleep(waitInMs); throw new PriorityWaitException(waitInMs); } } return false; } else { return true; } }