RateLimiter:结合Spring Aop应用,以及SmoothBursty原理分析

简介: RateLimiter:结合Spring Aop应用,以及SmoothBursty原理分析

RateLimiter


RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。

令牌桶算法

令牌桶算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。


RateLimiter、Semaphore 区别

RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。与Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。(注意尽管并发性和速率是紧密相关的,比如参考Little定律


RateLimiter 使用

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>

最新版本链接:https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22com.google.guava%22%20AND%20a%3A%22guava%22


RateLimiter方法摘要

修饰符和类型 方法和描述
double acquire()
从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
double acquire(int permits)
从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
static RateLimiter create(double permitsPerSecond)
根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
double getRate()
返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
void setRate(double permitsPerSecond)
更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
String toString()
返回对象的字符表现形式
boolean tryAcquire()
从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)
从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
boolean tryAcquire(long timeout, TimeUnit unit)
从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)


RateLimiter:结合Spring Aop应用

依赖

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>20.0</version>
</dependency>


代码实现

定义注解

    @Inherited
    @Documented
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RateLimit {
        double limitNum() default 20;  //默认每秒放入桶中的token
    }


封装定义返回结果

public class MyResult {
private Integer status;
private String msg;
private List<Object> data;
public MyResult(Integer status, String msg, List<Object> data) {
  this.status = status;
  this.msg = msg;
  this.data = data;
}
public static MyResult OK(String msg, List<Object> data) {
  return new MyResult(200, msg, data);
}
public static MyResult Error(Integer status, String msg) {
  return new MyResult(status, msg, null);
}


aop实现

@Component
@Scope
@Aspect
public class RateLimitAspect {
    private Logger log = LoggerFactory.getLogger(this.getClass());
    //用来存放不同接口的RateLimiter(key为接口名称,value为RateLimiter)
    //在有RBAC框架的项目中,这个地方可以使用加载数据库配置的办法
    private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();
    private static ObjectMapper objectMapper = new ObjectMapper();
    private RateLimiter rateLimiter;
    @Autowired
    private HttpServletResponse response;
    @Pointcut("@annotation(com.icat.retalimitaop.annotation.RateLimit)")
    public void serviceLimit() {
    }
    @Around("serviceLimit()")
    public Object around(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        Object obj = null;
        //获取拦截的方法名
        Signature sig = joinPoint.getSignature();
        //获取拦截的方法名
        MethodSignature msig = (MethodSignature) sig;
        //返回被织入增加处理目标对象
        Object target = joinPoint.getTarget();
        //为了获取注解信息
        Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes());
        //获取注解信息
        RateLimit annotation = currentMethod.getAnnotation(RateLimit.class);
        double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token
        String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略
        //获取rateLimiter
         if(map.containsKey(functionName)){
             rateLimiter = map.get(functionName);
         }else {
             map.put(functionName, RateLimiter.create(limitNum));
             rateLimiter = map.get(functionName);
         }
        try {
            if (rateLimiter.tryAcquire()) {
                //执行方法
                obj = joinPoint.proceed();
            } else {
                //拒绝了请求(服务降级)
                String result = objectMapper.writeValueAsString(MyResult.Error(500, "系统繁忙!"));
                log.info("拒绝了请求:" + result);
                outErrorResult(result);
            }
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return obj;
    }
    //将结果返回
    public void outErrorResult(String result) {
        response.setContentType("application/json;charset=UTF-8");
        try (ServletOutputStream outputStream = response.getOutputStream()) {
            outputStream.write(result.getBytes("utf-8"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    static {
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    }
}


接口限流

// 每秒限流5个
@RateLimit(limitNum = 5.0)
public MyResult getResults() {
  log.info("调用了方法getResults");
  return MyResult.OK("调用了方法", null);
}
// 每秒限流10个
@RateLimit(limitNum = 10.0)
public MyResult getResultTwo() {
  log.info("调用了方法getResultTwo");
  return MyResult.OK("调用了方法getResultTwo", null);
}


RateLimiter 核心思想

RateLimiter内部有两个实现:SmoothBursty和SmoothWarmingUp

RateLimiter主要的类的类图


SmoothBursty

  • SmoothBursty限流器使用令牌桶算法实现,这个限流器在空闲时候能够存储一定的令牌(默认是1秒钟时间产生的令牌),可以应对空闲一段时间后突然的爆发量请求。
  • guava的RateLimiter有一个核心的设计思想:当前请求的债务(请求的令牌大于限流器存储的令牌数)由下一个请求来偿还(上个请求亏欠的令牌,下个请求需要等待亏欠令牌生产出来以后才能被授权)。


SmoothBursty特点:

  • 令牌桶限流器:使用令牌桶算法实现的限流器。
  • 稳定限流:当流量饱和时,限流器会按照固定的速率来生产令牌,从而请求按照固定的等待时间来执行。
  • 能应对爆发流量当限流器被闲置一段时间后,会存储1秒钟生产的令牌,这时如果突然有爆发流量到来,则直接从桶中获取令牌而无需等待。


SmoothBursty关键属性

  • stableIntervalMicros,稳定生成令牌的时间间隔(微秒),公式为:stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond。
  • maxBurstSeconds,能够存储多少秒生产的令牌,默认是1秒。
  • maxPermits,最大能够存储的令牌数,限流器随着时间推进,会不断的生产令牌,可以想象为有一个桶,用于存储产出过剩的令牌,但是桶有一个最大容量。计算公式:maxPermits = maxBurstSeconds * permitsPerSecond。
  • nextFreeTicketMicros,下个请求可以被授权令牌的时间(不管请求多少令牌),这个属性是实现当前请求的债务由下个请求来偿还机制的关键。
  • storedPermits,已存储的令牌,生产过剩的令牌会存储在桶内,但是小于等于maxPermits,这个值是SmoothBursty限流器能够应对突然爆发量请求的关键。生产令牌计算公式:newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros。


SmoothBursty限流器时序图

创建限流器

创建限流器

请求令牌

请求令牌

  • 第6步和第7步是实现债务转移的关键,例如permitsPerSecond=2,则stableIntervalMicros=500000,nextFreeTicketMicros=0,storedPermits=0,A请求到达,请求令牌数4,根据第6步可知,A请求返回需要等待时间是0,由于storedPermits=0,则4个令牌需要生产,生产时间为4*500000=2000000微秒=2秒,然后把这2秒累加到nextFreeTicketMicros,nextFreeTicketMicros=2000000;B请求到达,请求令牌数1,返回的等待时间为2000000(这个就是A请求的债务)。


尝试请求令牌

尝试请求令牌

  • 第6步中,如果计算nextFreeTicketMicros与当前时间的差额大于传入的超时时间,则会立即返回,不会阻塞。


SmoothWarmingUp

SmoothWarmingUp 相对 SmoothBursty 来说主要区别在于 storedPermitsToWaitTime 方法。其他部分原理和 SmoothBursty 类似。


创建

SmoothWarmingUp 是 SmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:

static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;
    private double thresholdPermits;
    private double coldFactor;
    ...
}
复制代码

这四个参数都是和 SmoothWarmingUp 的“热身”(warmup)机制相关。warmup 可以用如下的图来表示:

*          ^ throttling
*          |
*    cold  +                  /
* interval |                 /.
*          |                / .
*          |               /  .   ← "warmup period" is the area of the trapezoid between
*          |              /   .     thresholdPermits and maxPermits
*          |             /    .
*          |            /     .
*          |           /      .
*   stable +----------/  WARM .
* interval |          .   UP  .
*          |          . PERIOD.
*          |          .       .
*        0 +----------+-------+--------------→ storedPermits
*          0 thresholdPermits maxPermits
复制代码


上图中横坐标是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUp 将 storedPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。纵坐标是请求的间隔时间,stableInterval 就是 1 / QPS,例如设置的 QPS 为5,则 stableInterval 就是200ms,coldInterval = stableInterval * coldFactor,这里的 coldFactor "hard-code"写死的是3。

当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits;当系统请求增多,图像会像左移动,直到 storedPermits 为0。


storedPermitsToWaitTime方法

上面"矩形+梯形"图像的面积就是 waitMicros 也即是本次请求需要等待的时间。计算过程在 SmoothWarmingUp 类的 storedPermitsToWaitTime 方法中覆写:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
    long micros = 0;
    // measuring the integral on the right part of the function (the climbing line)
    if (availablePermitsAboveThreshold > 0.0) { // 如果当前 storedPermits 超过 availablePermitsAboveThreshold 则计算从 超过部分拿令牌所需要的时间(图中的 WARM UP PERIOD)
        // WARM UP PERIOD 部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length = permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 计算出从 WARM UP PERIOD 拿走令牌的时间
        permitsToTake -= permitsAboveThresholdToTake; // 剩余的令牌从 stable 部分拿
    }
    // measuring the integral on the left part of the function (the horizontal line)
    micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌获取花费的时间
    return micros;
}
// WARM UP PERIOD 部分 获取相应令牌所对应的的时间
private double permitsToTime(double permits) {
    return stableIntervalMicros + permits * slope;
}
复制代码

SmoothWarmingUp 类中 storedPermitsToWaitTime 方法将 permitsToTake 分为两部分,一部分从 WARM UP PERIOD 部分拿,这部分是一个梯形,面积计算就是(上底 + 下底)* 高 / 2。另一部分从 stable 部分拿,它是一个长方形,面积就是 长 * 宽。最后返回两个部分的时间总和。


参考链接



目录
相关文章
|
2月前
|
XML Java 开发者
Spring Boot开箱即用可插拔实现过程演练与原理剖析
【11月更文挑战第20天】Spring Boot是一个基于Spring框架的项目,其设计目的是简化Spring应用的初始搭建以及开发过程。Spring Boot通过提供约定优于配置的理念,减少了大量的XML配置和手动设置,使得开发者能够更专注于业务逻辑的实现。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,为开发者提供一个全面的理解。
38 0
|
19天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
69 14
|
2月前
|
XML Java 数据安全/隐私保护
Spring Aop该如何使用
本文介绍了AOP(面向切面编程)的基本概念和术语,并通过具体业务场景演示了如何在Spring框架中使用Spring AOP。文章详细解释了切面、连接点、通知、切点等关键术语,并提供了完整的示例代码,帮助读者轻松理解和应用Spring AOP。
Spring Aop该如何使用
|
2月前
|
监控 安全 Java
什么是AOP?如何与Spring Boot一起使用?
什么是AOP?如何与Spring Boot一起使用?
72 5
|
2月前
|
Java 开发者 Spring
深入解析:Spring AOP的底层实现机制
在现代软件开发中,Spring框架的AOP(面向切面编程)功能因其能够有效分离横切关注点(如日志记录、事务管理等)而备受青睐。本文将深入探讨Spring AOP的底层原理,揭示其如何通过动态代理技术实现方法的增强。
77 8
|
2月前
|
Java 开发者 Spring
Spring AOP 底层原理技术分享
Spring AOP(面向切面编程)是Spring框架中一个强大的功能,它允许开发者在不修改业务逻辑代码的情况下,增加额外的功能,如日志记录、事务管理等。本文将深入探讨Spring AOP的底层原理,包括其核心概念、实现方式以及如何与Spring框架协同工作。
|
2月前
|
XML 监控 安全
深入调查研究Spring AOP
【11月更文挑战第15天】
49 5
|
2月前
|
Java 开发者 Spring
Spring AOP深度解析:探秘动态代理与增强逻辑
Spring框架中的AOP(Aspect-Oriented Programming,面向切面编程)功能为开发者提供了一种强大的工具,用以将横切关注点(如日志、事务管理等)与业务逻辑分离。本文将深入探讨Spring AOP的底层原理,包括动态代理机制和增强逻辑的实现。
49 4
|
2月前
|
Java Spring
[Spring]aop的配置与使用
本文介绍了AOP(面向切面编程)的基本概念和核心思想。AOP是Spring框架的核心功能之一,通过动态代理在不修改原代码的情况下注入新功能。文章详细解释了连接点、切入点、通知、切面等关键概念,并列举了前置通知、后置通知、最终通知、异常通知和环绕通知五种通知类型。
45 1
|
2月前
|
安全 Java 测试技术
Java开发必读,谈谈对Spring IOC与AOP的理解
Spring的IOC和AOP机制通过依赖注入和横切关注点的分离,大大提高了代码的模块化和可维护性。IOC使得对象的创建和管理变得灵活可控,降低了对象之间的耦合度;AOP则通过动态代理机制实现了横切关注点的集中管理,减少了重复代码。理解和掌握这两个核心概念,是高效使用Spring框架的关键。希望本文对你深入理解Spring的IOC和AOP有所帮助。
46 0