计数器限流算法
我们可以直接通过一个计数器,限制每一秒钟能够接收的请求数。比如说 qps定为 1000,那么实现思路就是从第一个请求进来开始计时,在接下去的 1s 内,每来一个请求,就把计数加 1,如果累加的数字达到了 1000,那么后续的请求就会被全部拒绝。等到 1s 结束后,把计数恢复成 0 ,重新开始计数。
优点:实现简单
缺点:如果1s 内的前半秒,已经通过了 1000 个请求,那后面的半秒只能请求拒绝,我们把这种现象称为“突刺现象”。
实现代码案例:
public class Counter { public long timeStamp = getNowTime(); public int reqCount = 0; public final int limit = 100; // 时间窗口内最大请求数 public final long interval = 1000; // 时间窗口ms public boolean limit() { long now = getNowTime(); if (now < timeStamp + interval) { // 在时间窗口内 reqCount++; // 判断当前时间窗口内是否超过最大请求控制数 return reqCount <= limit; } else { timeStamp = now; // 超时后重置 reqCount = 1; return true; } } public long getNowTime() { return System.currentTimeMillis(); } }
滑动时间窗算法
滑动窗口,又称 Rolling Window。为了解决计数器算法的缺陷,我们引入了滑动窗口算法。下面这张图,很好地解释了滑动窗口算法:
在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。
那么滑动窗口怎么解决刚才的临界问题的呢?我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。
我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。
由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
实现代码案例:
public class SlideWindow { /** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */ private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>(); private SlideWindow() {} public static void main(String[] args) throws InterruptedException { while (true) { // 任意10秒内,只允许2次通过 System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L)); // 睡眠0-10秒 Thread.sleep(1000 * new Random().nextInt(10)); } } /** * 滑动时间窗口限流算法 * 在指定时间窗口,指定限制次数内,是否允许通过 * * @param listId 队列id * @param count 限制次数 * @param timeWindow 时间窗口大小 * @return 是否允许通过 */ public static synchronized boolean isGo(String listId, int count, long timeWindow) { // 获取当前时间 long nowTime = System.currentTimeMillis(); // 根据队列id,取出对应的限流队列,若没有则创建 List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>()); // 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置 if (list.size() < count) { list.add(0, nowTime); return true; } // 队列已满(达到限制次数),则获取队列中最早添加的时间戳 Long farTime = list.get(count - 1); // 用当前时间戳 减去 最早添加的时间戳 if (nowTime - farTime <= timeWindow) { // 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count // 不允许通过 return false; } else { // 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count // 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置 list.remove(count - 1); list.add(0, nowTime); return true; } } }
在 Sentinel 中 通过 LeapArray
结构来实现时间窗算法, 它的核心代码如下(只列举获取时间窗方法):
/** * 获取当前的时间窗 * * Get bucket item at provided timestamp. * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 计算窗口的开始时间,计算每个格子的开始时间 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { WindowWrap<T> old = array.get(idx); // 如果没有窗格,创建窗格 if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 当前窗格存在,返回历史窗格 } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; // } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. // 清空所有的窗格数据 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 如果时钟回拨,重新创建时间格 } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }