4. 滑动日志算法
滑动日志算法是实现限流的另一种方法,这种方法比较简单。基本逻辑就是记录下所有的请求时间点,新请求到来时先判断最近指定时间范围内的请求数量是否超过指定阈值,由此来确定是否达到限流,这种方式没有了时间窗口突变的问题,限流比较准确,但是因为要记录下每次请求的时间点,所以占用的内存较多。
4.1. 代码实现
下面是简单实现的 一个滑动日志算法,因为滑动日志要每次请求单独存储一条记录,可能占用内存过多。所以下面这个实现其实不算严谨的滑动日志,更像一个把 1 秒时间切分成 1000 个时间窗口的滑动窗口算法。
package com.wdbyte.rate.limiter; import java.time.LocalTime; import java.util.HashSet; import java.util.Set; import java.util.TreeMap; /** * 滑动日志方式限流 * 设置 QPS 为 2. * * @author https://www.wdbyte.com */ public class RateLimiterSildingLog { /** * 阈值 */ private Integer qps = 2; /** * 记录请求的时间戳,和数量 */ private TreeMap<Long, Long> treeMap = new TreeMap<>(); /** * 清理请求记录间隔, 60 秒 */ private long claerTime = 60 * 1000; public RateLimiterSildingLog(Integer qps) { this.qps = qps; } public synchronized boolean tryAcquire() { long now = System.currentTimeMillis(); // 清理过期的数据老数据,最长 60 秒清理一次 if (!treeMap.isEmpty() && (treeMap.firstKey() - now) > claerTime) { Set<Long> keySet = new HashSet<>(treeMap.subMap(0L, now - 1000).keySet()); for (Long key : keySet) { treeMap.remove(key); } } // 计算当前请求次数 int sum = 0; for (Long value : treeMap.subMap(now - 1000, now).values()) { sum += value; } // 超过QPS限制,直接返回 false if (sum + 1 > qps) { return false; } // 记录本次请求 if (treeMap.containsKey(now)) { treeMap.compute(now, (k, v) -> v + 1); } else { treeMap.put(now, 1L); } return sum <= qps; } public static void main(String[] args) throws InterruptedException { RateLimiterSildingLog rateLimiterSildingLog = new RateLimiterSildingLog(3); for (int i = 0; i < 10; i++) { Thread.sleep(250); LocalTime now = LocalTime.now(); if (rateLimiterSildingLog.tryAcquire()) { System.out.println(now + " 做点什么"); } else { System.out.println(now + " 被限流"); } } } }
代码中把阈值 QPS 设定为 3,运行可以得到如下日志:
20:51:17.395087 做点什么 20:51:17.653114 做点什么 20:51:17.903543 做点什么 20:51:18.154104 被限流 20:51:18.405497 做点什么 20:51:18.655885 做点什么 20:51:18.906177 做点什么 20:51:19.158113 被限流 20:51:19.410512 做点什么 20:51:19.661629 做点什么
5. 漏桶算法
漏桶算法中的漏桶是一个形象的比喻,这里可以用生产者消费者模式进行说明,请求是一个生产者,每一个请求都如一滴水,请求到来后放到一个队列(漏桶)中,而桶底有一个孔,不断的漏出水滴,就如消费者不断的在消费队列中的内容,消费的速率(漏出的速度)等于限流阈值。即假如 QPS 为 2,则每 1s / 2= 500ms
消费一次。漏桶的桶有大小,就如队列的容量,当请求堆积超过指定容量时,会触发拒绝策略。
下面是漏桶算法的示意图。
漏桶算法
由介绍可以知道,漏桶模式中的消费处理总是能以恒定的速度进行,可以很好的保护自身系统不被突如其来的流量冲垮;但是这也是漏桶模式的缺点,假设 QPS 为 2,同时 2 个请求进来,2 个请求并不能同时进行处理响应,因为每 1s / 2= 500ms
只能处理一个请求。
6. 令牌桶算法
令牌桶算法同样是实现限流是一种常见的思路,最为常用的 Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现。令牌桶的实现思路类似于生产者和消费之间的关系。
系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 2,每 500ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
请求执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝本次请求,由此达到限流目的。
下面是令牌桶限流算法示意图。
令牌桶算法
思考令牌桶的实现可以以下特点。
- 1s / 阈值(QPS) = 令牌添加时间间隔。
- 桶的容量等于限流的阈值,令牌数量达到阈值时,不再添加。
- 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。
- 有启动过程,令牌桶启动时桶中无令牌,然后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这类问题。
6.1. 代码实现
Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现,日常开发中我们也不会手动实现了,这里直接使用 RateLimiter 进行测试。
引入依赖:
<exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.0.1-jre</version> </exclusion>
RateLimiter 限流体验:
// qps 2 RateLimiter rateLimiter = RateLimiter.create(2); for (int i = 0; i < 10; i++) { String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME); System.out.println(time + ":" + rateLimiter.tryAcquire()); Thread.sleep(250); }
代码中限制 QPS 为 2,也就是每隔 500ms 生成一个令牌,但是程序每隔 250ms 获取一次令牌,所以两次获取中只有一次会成功。
17:19:06.797557:true 17:19:07.061419:false 17:19:07.316283:true 17:19:07.566746:false 17:19:07.817035:true 17:19:08.072483:false 17:19:08.326347:true 17:19:08.577661:false 17:19:08.830252:true 17:19:09.085327:false
6.2. 思考
虽然演示了 Google Guava 工具包中的 RateLimiter 的实现,但是我们需要思考一个问题,就是令牌的添加方式,如果按照指定间隔添加令牌,那么需要开一个线程去定时添加,如果有很多个接口很多个 RateLimiter 实例,线程数会随之增加,这显然不是一个好的办法。显然 Google 也考虑到了这个问题,在 RateLimiter 中,是在每次令牌获取时才进行计算令牌是否足够的。它通过存储的下一个令牌生成的时间,和当前获取令牌的时间差,再结合阈值,去计算令牌是否足够,同时再记录下一个令牌的生成时间以便下一次调用。
下面是 Guava 中 RateLimiter 类的子类 SmoothRateLimiter 的 resync()
方法的代码分析,可以看到其中的令牌计算逻辑。
void resync(long nowMicros) { // 当前微秒时间 // 当前时间是否大于下一个令牌生成时间 if (nowMicros > this.nextFreeTicketMicros) { // 可生成的令牌数 newPermits = (当前时间 - 下一个令牌生成时间)/ 令牌生成时间间隔。 // 如果 QPS 为2,这里的 coolDownIntervalMicros 就是 500000.0 微秒(500ms) double newPermits = (double)(nowMicros - this.nextFreeTicketMicros) / this.coolDownIntervalMicros(); // 更新令牌库存 storedPermits。 this.storedPermits = Math.min(this.maxPermits, this.storedPermits + newPermits); // 更新下一个令牌生成时间 nextFreeTicketMicros this.nextFreeTicketMicros = nowMicros; } }