前言
最近几年,随着微服务的流行,服务和服务之间的依赖越来越强,调用关系越来越复杂,服务和服务之间的稳定性越来越重要。在遇到突发的请求量激增,恶意的用户访问,亦或请求频率过高给下游服务带来较大压力时,我们常常需要通过缓存、限流、熔断降级、负载均衡等多种方式保证服务的稳定性。其中限流是不可或缺的一环,这篇文章介绍限流相关知识。
1. 限流
限流顾名思义,就是对请求或并发数进行限制;通过对一个时间窗口内的请求量进行限制来保障系统的正常运行。如果我们的服务资源有限、处理能力有限,就需要对调用我们服务的上游请求进行限制,以防止自身服务由于资源耗尽而停止服务。
在限流中有两个概念需要了解。
- 阈值:在一个单位时间内允许的请求量。如 QPS 限制为10,说明 1 秒内最多接受 10 次请求。
- 拒绝策略:超过阈值的请求的拒绝策略,常见的拒绝策略有直接拒绝、排队等待等。
2. 固定窗口算法
固定窗口算法又叫计数器算法,是一种简单方便的限流算法。主要通过一个支持原子操作的计数器来累计 1 秒内的请求次数,当 1 秒内计数达到限流阈值时触发拒绝策略。每过 1 秒,计数器重置为 0 开始重新计数。
2.1. 代码实现
下面是简单的代码实现,QPS 限制为 2,这里的代码做了一些优化,并没有单独开一个线程去每隔 1 秒重置计数器,而是在每次调用时进行时间间隔计算来确定是否先重置计数器。
/** * @author https://www.wdbyte.com */ public class RateLimiterSimpleWindow { // 阈值 private static Integer QPS = 2; // 时间窗口(毫秒) private static long TIME_WINDOWS = 1000; // 计数器 private static AtomicInteger REQ_COUNT = new AtomicInteger(); private static long START_TIME = System.currentTimeMillis(); public synchronized static boolean tryAcquire() { if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) { REQ_COUNT.set(0); START_TIME = System.currentTimeMillis(); } return REQ_COUNT.incrementAndGet() <= QPS; } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { Thread.sleep(250); LocalTime now = LocalTime.now(); if (!tryAcquire()) { System.out.println(now + " 被限流"); } else { System.out.println(now + " 做点什么"); } } } }
运行结果:
20:53:43.038922 做点什么 20:53:43.291435 做点什么 20:53:43.543087 被限流 20:53:43.796666 做点什么 20:53:44.050855 做点什么 20:53:44.303547 被限流 20:53:44.555008 被限流 20:53:44.809083 做点什么 20:53:45.063828 做点什么 20:53:45.314433 被限流
从输出结果中可以看到大概每秒操作 3 次,由于限制 QPS 为 2,所以平均会有一次被限流。看起来可以了,不过我们思考一下就会发现这种简单的限流方式是有问题的,虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次。
固定窗口算法
简单修改测试代码,可以进行验证:
// 先休眠 400ms,可以更快的到达时间窗口。 Thread.sleep(400); for (int i = 0; i < 10; i++) { Thread.sleep(250); if (!tryAcquire()) { System.out.println("被限流"); } else { System.out.println("做点什么"); } }
得到输出中可以看到连续 4 次请求,间隔 250 ms 没有却被限制。:
20:51:17.395087 做点什么 20:51:17.653114 做点什么 20:51:17.903543 做点什么 20:51:18.154104 被限流 20:51:18.405497 做点什么 20:51:18.655885 做点什么 20:51:18.906177 做点什么 20:51:19.158113 被限流 20:51:19.410512 做点什么 20:51:19.661629 做点什么
3. 滑动窗口算法
我们已经知道固定窗口算法的实现方式以及它所存在的问题,而滑动窗口算法是对固定窗口算法的改进。既然固定窗口算法在遇到时间窗口的临界突变时会有问题,那么我们在遇到下一个时间窗口前也调整时间窗口不就可以了吗?
下面是滑动窗口的示意图。
滑动窗口算法
上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题。
3.1. 代码实现
下面是基于以上滑动窗口思路实现的简单的滑动窗口限流工具类。
package com.wdbyte.rate.limiter; import java.time.LocalTime; import java.util.concurrent.atomic.AtomicInteger; /** * 滑动窗口限流工具类 * * @author https://www.wdbyte.com */ public class RateLimiterSlidingWindow { /** * 阈值 */ private int qps = 2; /** * 时间窗口总大小(毫秒) */ private long windowSize = 1000; /** * 多少个子窗口 */ private Integer windowCount = 10; /** * 窗口列表 */ private WindowInfo[] windowArray = new WindowInfo[windowCount]; public RateLimiterSlidingWindow(int qps) { this.qps = qps; long currentTimeMillis = System.currentTimeMillis(); for (int i = 0; i < windowArray.length; i++) { windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0)); } } /** * 1. 计算当前时间窗口 * 2. 更新当前窗口计数 & 重置过期窗口计数 * 3. 当前 QPS 是否超过限制 * * @return */ public synchronized boolean tryAcquire() { long currentTimeMillis = System.currentTimeMillis(); // 1. 计算当前时间窗口 int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount)); // 2. 更新当前窗口计数 & 重置过期窗口计数 int sum = 0; for (int i = 0; i < windowArray.length; i++) { WindowInfo windowInfo = windowArray[i]; if ((currentTimeMillis - windowInfo.getTime()) > windowSize) { windowInfo.getNumber().set(0); windowInfo.setTime(currentTimeMillis); } if (currentIndex == i && windowInfo.getNumber().get() < qps) { windowInfo.getNumber().incrementAndGet(); } sum = sum + windowInfo.getNumber().get(); } // 3. 当前 QPS 是否超过限制 return sum <= qps; } private class WindowInfo { // 窗口开始时间 private Long time; // 计数器 private AtomicInteger number; public WindowInfo(long time, AtomicInteger number) { this.time = time; this.number = number; } // get...set... } }
下面是测试用例,设置 QPS 为 2,测试次数 20 次,每次间隔 300 毫秒,预计成功次数在 12 次左右。
public static void main(String[] args) throws InterruptedException { int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps; System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success)); success = 0; RateLimiterSlidingWindow myRateLimiter = new RateLimiterSlidingWindow(qps); for (int i = 0; i < count; i++) { Thread.sleep(sleep); if (myRateLimiter.tryAcquire()) { success++; if (success % qps == 0) { System.out.println(LocalTime.now() + ": success, "); } else { System.out.print(LocalTime.now() + ": success, "); } } else { System.out.println(LocalTime.now() + ": fail"); } } System.out.println(); System.out.println("实际测试成功次数:" + success); }
下面是测试的结果。
当前QPS限制为:2,当前测试次数:20,间隔:300ms,预计成功次数:12 16:04:27.077782: success, 16:04:27.380715: success, 16:04:27.684244: fail 16:04:27.989579: success, 16:04:28.293347: success, 16:04:28.597658: fail 16:04:28.901688: fail 16:04:29.205262: success, 16:04:29.507117: success, 16:04:29.812188: fail 16:04:30.115316: fail 16:04:30.420596: success, 16:04:30.725897: success, 16:04:31.028599: fail 16:04:31.331047: fail 16:04:31.634127: success, 16:04:31.939411: success, 16:04:32.242380: fail 16:04:32.547626: fail 16:04:32.847965: success, 实际测试成功次数:11