深入理解Sentinel系列-2.Sentinel原理及核心源码分析(下)

简介: 深入理解Sentinel系列-2.Sentinel原理及核心源码分析

深入理解Sentinel系列-2.Sentinel原理及核心源码分析(上):https://developer.aliyun.com/article/1413968


熔断降级


Sentinel的熔断降级是一种服务保护机制,用于在系统出现异常或超载时,自动限制对受影响服务的访问,防止其继续受到压力而导致系统崩溃。熔断降级可以通过设置阈值和规则来实现,一旦达到设定的条件,系统将自动停止对服务的访问,直到服务恢复正常或超载情况解除。这样可以保护系统免受过载和异常情况的影响,确保系统的稳定性和可靠性。


熔断指标


怎么判断这个服务是一个不稳定的状态?


异常数:


平均响应时间:


异常比例数:10s内,20次请求里面,有百分之五十及以上的错误率,就认为其要触发熔断,而hystrix只有这一种


熔断规则


规则,也就是要设计对应的指标,指标设计为多少才会熔断!


比如说:


  • 1min内,异常数量超过50%,触发熔断
  • 1s 5个请求,平均响应时间超过一个阈值(1000ms)
  • 1min内,超过了多少个异常数量


熔断时间窗口


当处于熔断状态的时候,多长时间内,请求都不会发送到服务端,熔断窗口也是一个需要设置的阈值


熔断实践


public class DataSourceInitFunc implements InitFunc{
    public void init() throws Exception{
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule();
        rule.setResource("com.gupaodu.springcloud.dubbo.ISayHelloService");// 表示针对那个服务或者方法的熔断
        // 针对这个类下面所有的接口进行资源的判断,当这个类下面任何一个方法触发了熔断,那么调用这个类下面的任何一个接口都会自动触发降级。
        // 设置模式
        rule.setGrade(RuleConstant.Degrade.DEGRADE_CRADE_EXCEPTION_COUNT)// 错误数 超时时间 错误率 指标
        rule.setCount(3);// 阈值
        rule.setTimeWindow(100)// 时间窗口 100s
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
    }
}
public interface ISayHelloService{
    String sayHello(String msg);
  String exceptionTest();
}
@DubboService
public class SayHelloServiceImpl implement ISayHelloService{
    public String sayHello(String msg){
        return "";
    }
    // 用来触发测试
    public String exceptionTest(){
        throw new RuntimeException("biz exception");
    }
}
@RestController
public class SentinelController {
    @DubboReference(mock="......SayHelloServiceMock")
    ISayHelloService sayHelloService;
    @GetMapping("/say")
    public String say(){
        return sayHelloService.sayHello("");
    }
    @GetMapping("/exception")
    public String exception(){
        return sayHelloService.exceptionTest();
    }
}
public class SayHelloServiceMock implements ISayHelloService{
    public String sayHello(String msg){
        return "触发了降级,返回默认数据";
    }
    // 用来触发测试
    public String exceptionTest(){
        return "触发了降级,返回默认数据"
    }
}

其实本质就是 当触发熔断之后,该接口下其他方法在时间窗口内 也不能访问,只能被降级。


sentinel的实现


构建一个资源

设置一个规则


指标范围:


  • 当前的qps
  • 当前的总的并发数线程数
  • 当前失败请求数是多少?

此图为官方提供的图,一个请求过来,它会在 调用链路构建处 构建一个 Invocation Tree结构,也就是将我们请求的资源构建成一个节点,以树形的方式去构建。然后就是针对某个节点进行监控统计,主要是以环形数组的方式哦存,其实也就是每一个node维护了一个滑动窗口,其统计了针对这个资源总的信息去进行指标统计,然后通过一个判断责任链去鉴别触发了那些规则。


分析sentinel源码实现


public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
    }
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException {
        return this.entryWithType(name, resourceType, entryType, count, false, args);
    }
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
        return this.entryWithPriority(resource, count, prioritized, args);
    }
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    // 获取线程上下文
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
        } else {
            if (context == null) {
                context = CtSph.InternalContextUtil.internalEnter("sentinel_default_context");
            }
            // 是否开启或关闭流控
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
            } else {
                // 构建了一个调用链
                ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);
------------------------------------------------------------------------------
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized(LOCK) {
                chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
                if (chain == null) {
                    // 超过阈值就直接返回null
                    if (chainMap.size() >= 6000) {
                        return null;
                    }
          // 单例模式创建调用链
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                    // 如果直接添加进去的话,涉及到扩容,所以采用创建之后赋值的方法
                }
            }
        }
        return chain;
    }
public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        } else {
            slotChainBuilder = (SlotChainBuilder)SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
            if (slotChainBuilder == null) {
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default", new Object[0]);
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", new Object[]{slotChainBuilder.getClass().getCanonicalName()});
            }
            return slotChainBuilder.build();
        }
    }
 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        Iterator var3 = sortedSlotList.iterator();
    // 然后在这里进行一个遍历
        while(var3.hasNext()) {
            ProcessorSlot slot = (ProcessorSlot)var3.next();
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
            } else {
                chain.addLast((AbstractLinkedProcessorSlot)slot);
            }
        }
        return chain;
    }
// 在这里面涉及到了SpiLoader
// 其实也就是在这个链路中,可以进入到我们自己的 自定义的slot 中

if (chain == null) {
                    return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
                } else {
                    CtEntry e = new CtEntry(resourceWrapper, chain, context);
                    try {
                        // 构建好以后然后通过链式去处理
                        chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);
                    } catch (BlockException var9) {
                        e.exit(count, args);
                        throw var9;
                    } catch (Throwable var10) {
                        RecordLog.info("Sentinel unexpected exception", var10);
                    }
                    return e;
                }
            }
        }
    }

这两个就是前面说的滑动窗口要去统计的东西


前面传进来的资源包装成一个 resourceWrapper对象

当我们打算进入 chain.entry 的时候,发现其实就是一系列的slot


树的构建 NodeSelectorSlot


public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        DefaultNode node = (DefaultNode)this.map.get(context.getName());
    // 双重检查锁 + 缓存
        if (node == null) {
            synchronized(this) {
                node = (DefaultNode)this.map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, (ClusterNode)null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap(this.map.size());
                    cacheMap.putAll(this.map);
                    cacheMap.put(context.getName(), node);
                    this.map = cacheMap;
                    ((DefaultNode)context.getLastNode()).addChild(node);
                }
            }
        }
        context.setCurNode(node);
    // 当上述构建操作完成后
    // 释放entry
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
// 释放的本质其实就是执行下一个
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (this.next != null) {
            this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }


统计监控 StatisticSlot


实际上是并发量比较大的情况下做数据统计

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        Iterator var8;
        ProcessorSlotEntryCallback handler;
        try {
            // 先交给后续的slot进行处理
            this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // 然后开始统计资源
            // node代表当前传过来的资源
            node.increaseThreadNum();
            node.addPassRequest(count);
-----------------------------------------------------------------------------------
public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }
public void addPassRequest(int count) {
        this.rollingCounterInSecond.addPass(count); // 秒级别的统计
        this.rollingCounterInMinute.addPass(count); // 分钟级别的统计
    }
this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
SampleCountProperty.SAMPLE_COUNT = 2;
IntervalProperty.INTERVAL = 1000;
// 上面有点像滑动窗口的概念
public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = this.data.currentWindow();
    // 根据某一个指标加进去 这里面的本质就是通过当前的时间去得到一个窗口,将其加进去
    // LongAdder的思想 其底层就是数组累加的思想 !!!
        ((MetricBucket)wrap.value()).addPass(count);
    }
 // 当最终计数完成后,相当于形成了一开始的那个图的样子
private final LeapArray<MetricBucket> data;
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }
            // super(sampleCount, intervalInMs);
public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        this.windowLengthInMs = intervalInMs / sampleCount; // 每个窗口的时间长度
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = (double)intervalInMs / 1000.0D;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray(sampleCount); // sampleCount = 2
      // 也就代表着窗口的大小是两个
    }
protected final AtomicReferenceArray<WindowWrap<T>> array;

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }
            Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
            while(var13.hasNext()) {
                ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException var10) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException var11) {
            BlockException e = var11;
            context.getCurEntry().setBlockError(var11);
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }
            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }
            throw e;
        } catch (Throwable var12) {
            context.getCurEntry().setError(var12);
            throw var12;
        }
    }


流量控制 FlowSlot


public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        this.checkFlow(resourceWrapper, context, node, count, prioritized);
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        this.checker.checkFlow(this.ruleProvider, resource, context, node, count, prioritized);
    }
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider != null && resource != null) {
            // 获取针对这个资源的限流规则
            Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());
            if (rules != null) {
                Iterator var8 = rules.iterator();
                while(var8.hasNext()) {
                    FlowRule rule = (FlowRule)var8.next();
                    if (!this.canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }
        }
    }
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        } else {
            return rule.isClusterMode() ? passClusterCheck(rule, context, node, acquireCount, prioritized) : passLocalCheck(rule, context, node, acquireCount, prioritized);
        }
    }
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
      // 在这里再根据不同的策略去决定拒绝的要怎么办
        return selectedNode == null ? true : rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
      // 根据不同的来源进行限流
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else if ("default".equals(limitApp)) {
            return (Node)(strategy == 0 ? node.getClusterNode() : selectReferenceNode(rule, context, node));
        } else if ("other".equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else {
            return null;
        }
    }
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
      // 根据资源获取当前的使用量
        int curCount = this.avgUsedTokens(node);
      // 如果当前使用量 + 请求 > 阈值 则按照策略进行限流
        if ((double)(curCount + acquireCount) > this.count) {
            if (prioritized && this.grade == 1) {
                long currentTime = TimeUtil.currentTimeMillis();
                long waitInMs = node.tryOccupyNext(currentTime, acquireCount, this.count);
                if (waitInMs < (long)OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    this.sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        } else {
            return true;
        }
    }


目录
相关文章
|
25天前
|
监控 Java 应用服务中间件
Sentinel原理及实践
Sentinel原理及实践
36 1
|
2月前
|
监控 NoSQL 程序员
Redis 高可用篇:你管这叫 Sentinel 哨兵集群原理
Redis 高可用篇:你管这叫 Sentinel 哨兵集群原理
93 5
|
2月前
|
监控 算法 Java
sentinel 服务限流工作原理
sentinel 服务限流工作原理
|
2月前
|
存储 监控 测试技术
深入理解Sentinel系列-2.Sentinel原理及核心源码分析(上)
深入理解Sentinel系列-2.Sentinel原理及核心源码分析
177 0
|
7月前
|
算法 Java BI
Sentinel为什么这么强,我忍不住扒了扒背后的实现原理
大家好,我是三友~~ 最近我在整理代码仓库的时候突然发现了被尘封了接近两年之久的Sentinel源码库 两年前我出于好奇心扒了一下Sentinel的源码,但是由于Sentinel本身源码并不复杂,在简单扒了扒之后几乎就再没扒过了 那么既然现在又让我看到了,所以我准备再来好好地扒一扒,然后顺带写篇文章来总结一下。
Sentinel为什么这么强,我忍不住扒了扒背后的实现原理
|
11月前
|
算法 Java Sentinel
sentinel架构底层原理剖析详解
sentinel架构底层原理剖析详解
120 0
|
11月前
|
缓存 监控 Java
sentinel的原理以及基本使用
sentinel的原理以及基本使用
482 0
|
12月前
|
存储 负载均衡 监控
Sentinel工作原理
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。 只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
69 0
|
自然语言处理 算法 Java
深扒Sentinel背后的实现原理之后,我终于明白它为什么这么强了
最近我在整理代码仓库的时候突然发现了被尘封了接近两年之久的Sentinel源码库 两年前我出于好奇心扒了一下Sentinel的源码,但是由于Sentinel本身源码并不复杂,在简单扒了扒之后几乎就再没扒过了 那么既然现在又让我看到了,所以我准备再来好好地扒一扒,然后顺带写篇文章来总结一下。
|
9天前
|
监控 Java Sentinel
使用Sentinel进行服务调用的熔断和限流管理(SpringCloud2023实战)
Sentinel是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
26 3