RateLimiter
RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。
令牌桶算法
令牌桶算法概念如下:
- 令牌以固定速率生成;
- 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
- 如果桶空了,那么尝试取令牌的请求会被直接丢弃。
令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。
RateLimiter、Semaphore 区别
RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。与Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。(注意尽管并发性和速率是紧密相关的,比如参考Little定律)
RateLimiter 使用
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency>
RateLimiter方法摘要
修饰符和类型 | 方法和描述 |
double | acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits) 从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond) 根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | getRate() 返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | setRate(double permitsPerSecond) 更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString() 返回对象的字符表现形式 |
boolean | tryAcquire() 从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits) 从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit) 从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit) 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
RateLimiter:结合Spring Aop应用
依赖
<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency>
代码实现
定义注解
@Inherited @Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { double limitNum() default 20; //默认每秒放入桶中的token }
封装定义返回结果
public class MyResult { private Integer status; private String msg; private List<Object> data; public MyResult(Integer status, String msg, List<Object> data) { this.status = status; this.msg = msg; this.data = data; } public static MyResult OK(String msg, List<Object> data) { return new MyResult(200, msg, data); } public static MyResult Error(Integer status, String msg) { return new MyResult(status, msg, null); }
aop实现
@Component @Scope @Aspect public class RateLimitAspect { private Logger log = LoggerFactory.getLogger(this.getClass()); //用来存放不同接口的RateLimiter(key为接口名称,value为RateLimiter) //在有RBAC框架的项目中,这个地方可以使用加载数据库配置的办法 private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>(); private static ObjectMapper objectMapper = new ObjectMapper(); private RateLimiter rateLimiter; @Autowired private HttpServletResponse response; @Pointcut("@annotation(com.icat.retalimitaop.annotation.RateLimit)") public void serviceLimit() { } @Around("serviceLimit()") public Object around(ProceedingJoinPoint joinPoint) throws NoSuchMethodException { Object obj = null; //获取拦截的方法名 Signature sig = joinPoint.getSignature(); //获取拦截的方法名 MethodSignature msig = (MethodSignature) sig; //返回被织入增加处理目标对象 Object target = joinPoint.getTarget(); //为了获取注解信息 Method currentMethod = target.getClass().getMethod(msig.getName(), msig.getParameterTypes()); //获取注解信息 RateLimit annotation = currentMethod.getAnnotation(RateLimit.class); double limitNum = annotation.limitNum(); //获取注解每秒加入桶中的token String functionName = msig.getName(); // 注解所在方法名区分不同的限流策略 //获取rateLimiter if(map.containsKey(functionName)){ rateLimiter = map.get(functionName); }else { map.put(functionName, RateLimiter.create(limitNum)); rateLimiter = map.get(functionName); } try { if (rateLimiter.tryAcquire()) { //执行方法 obj = joinPoint.proceed(); } else { //拒绝了请求(服务降级) String result = objectMapper.writeValueAsString(MyResult.Error(500, "系统繁忙!")); log.info("拒绝了请求:" + result); outErrorResult(result); } } catch (Throwable throwable) { throwable.printStackTrace(); } return obj; } //将结果返回 public void outErrorResult(String result) { response.setContentType("application/json;charset=UTF-8"); try (ServletOutputStream outputStream = response.getOutputStream()) { outputStream.write(result.getBytes("utf-8")); } catch (IOException e) { e.printStackTrace(); } } static { objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); } }
接口限流
// 每秒限流5个 @RateLimit(limitNum = 5.0) public MyResult getResults() { log.info("调用了方法getResults"); return MyResult.OK("调用了方法", null); } // 每秒限流10个 @RateLimit(limitNum = 10.0) public MyResult getResultTwo() { log.info("调用了方法getResultTwo"); return MyResult.OK("调用了方法getResultTwo", null); }
RateLimiter 核心思想
RateLimiter内部有两个实现:SmoothBursty和SmoothWarmingUp
RateLimiter主要的类的类图
SmoothBursty
- SmoothBursty限流器使用令牌桶算法实现,这个限流器在空闲时候能够存储一定的令牌(默认是1秒钟时间产生的令牌),可以应对空闲一段时间后突然的爆发量请求。
- guava的RateLimiter有一个核心的设计思想:当前请求的债务(请求的令牌大于限流器存储的令牌数)由下一个请求来偿还(上个请求亏欠的令牌,下个请求需要等待亏欠令牌生产出来以后才能被授权)。
SmoothBursty特点:
- 令牌桶限流器:使用令牌桶算法实现的限流器。
- 稳定限流:当流量饱和时,限流器会按照固定的速率来生产令牌,从而请求按照固定的等待时间来执行。
- 能应对爆发流量:当限流器被闲置一段时间后,会存储1秒钟生产的令牌,这时如果突然有爆发流量到来,则直接从桶中获取令牌而无需等待。
SmoothBursty关键属性
- stableIntervalMicros,稳定生成令牌的时间间隔(微秒),公式为:stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond。
- maxBurstSeconds,能够存储多少秒生产的令牌,默认是1秒。
- maxPermits,最大能够存储的令牌数,限流器随着时间推进,会不断的生产令牌,可以想象为有一个桶,用于存储产出过剩的令牌,但是桶有一个最大容量。计算公式:maxPermits = maxBurstSeconds * permitsPerSecond。
- nextFreeTicketMicros,下个请求可以被授权令牌的时间(不管请求多少令牌),这个属性是实现当前请求的债务由下个请求来偿还机制的关键。
- storedPermits,已存储的令牌,生产过剩的令牌会存储在桶内,但是小于等于maxPermits,这个值是SmoothBursty限流器能够应对突然爆发量请求的关键。生产令牌计算公式:newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros。
SmoothBursty限流器时序图
创建限流器
创建限流器
请求令牌
请求令牌
- 第6步和第7步是实现债务转移的关键,例如permitsPerSecond=2,则stableIntervalMicros=500000,nextFreeTicketMicros=0,storedPermits=0,A请求到达,请求令牌数4,根据第6步可知,A请求返回需要等待时间是0,由于storedPermits=0,则4个令牌需要生产,生产时间为4*500000=2000000微秒=2秒,然后把这2秒累加到nextFreeTicketMicros,nextFreeTicketMicros=2000000;B请求到达,请求令牌数1,返回的等待时间为2000000(这个就是A请求的债务)。
尝试请求令牌
尝试请求令牌
- 第6步中,如果计算nextFreeTicketMicros与当前时间的差额大于传入的超时时间,则会立即返回,不会阻塞。
SmoothWarmingUp
SmoothWarmingUp 相对 SmoothBursty 来说主要区别在于 storedPermitsToWaitTime 方法。其他部分原理和 SmoothBursty 类似。
创建
SmoothWarmingUp 是 SmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:
static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; /** * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; private double coldFactor; ... } 复制代码
这四个参数都是和 SmoothWarmingUp 的“热身”(warmup)机制相关。warmup 可以用如下的图来表示:
* ^ throttling * | * cold + / * interval | /. * | / . * | / . ← "warmup period" is the area of the trapezoid between * | / . thresholdPermits and maxPermits * | / . * | / . * | / . * stable +----------/ WARM . * interval | . UP . * | . PERIOD. * | . . * 0 +----------+-------+--------------→ storedPermits * 0 thresholdPermits maxPermits 复制代码
上图中横坐标是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUp 将 storedPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。纵坐标是请求的间隔时间,stableInterval 就是 1 / QPS
,例如设置的 QPS 为5,则 stableInterval 就是200ms,coldInterval = stableInterval * coldFactor
,这里的 coldFactor "hard-code"写死的是3。
当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits;当系统请求增多,图像会像左移动,直到 storedPermits 为0。
storedPermitsToWaitTime方法
上面"矩形+梯形"图像的面积就是 waitMicros 也即是本次请求需要等待的时间。计算过程在 SmoothWarmingUp 类的 storedPermitsToWaitTime 方法中覆写:
@Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { // 如果当前 storedPermits 超过 availablePermitsAboveThreshold 则计算从 超过部分拿令牌所需要的时间(图中的 WARM UP PERIOD) // WARM UP PERIOD 部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2” double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // TODO(cpovirk): Figure out a good name for this variable. double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 计算出从 WARM UP PERIOD 拿走令牌的时间 permitsToTake -= permitsAboveThresholdToTake; // 剩余的令牌从 stable 部分拿 } // measuring the integral on the left part of the function (the horizontal line) micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌获取花费的时间 return micros; } // WARM UP PERIOD 部分 获取相应令牌所对应的的时间 private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } 复制代码
SmoothWarmingUp 类中 storedPermitsToWaitTime 方法将 permitsToTake 分为两部分,一部分从 WARM UP PERIOD 部分拿,这部分是一个梯形,面积计算就是(上底 + 下底)* 高 / 2。另一部分从 stable 部分拿,它是一个长方形,面积就是 长 * 宽。最后返回两个部分的时间总和。
参考链接
- https://guava.dev/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html
- https://blog.csdn.net/chen888999/article/details/82254694
- https://juejin.im/post/5c7510f3518825625e4ae41b