限流器的思路和算法
如果让你来造一个限流器,有啥想法?
漏桶算法
- 用一个固定大小的队列。比如设置限流为5qps,1s可以接受5个请求;那我们就造一个大小为5的队列,如果队列为满了,就拒绝请求;如果队列未满,就往队列添加请求。
令牌算法
令牌听起来挺酷的。以固定的速率往桶里发放令牌。然后消费者每次要取到令牌(acquire)才可以响应请求。
由于令牌是固定间隔发放的,假设还是5qps,如果我有1s内没有请求,我的令牌桶就满了,可以一瞬间响应5个请求(一次过取5个令牌),也就是可以应对瞬时流量。
那么这里也涉及一个固定间隔发放的问题,难道也是需要定时任务往”桶里“放令牌吗?
那我们来看下Guava怎么搞的,假设限流为2qps,那么固定发放令牌的时间stableIntervalMicros就是500ms,初始化的storedPermits当前桶里的令牌数是0。
RateLimiter限流器
RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如:你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。
com.google.common.util.concurrent.RateLimiter @ThreadSafe @Beta public abstract class RateLimiter extends Object 复制代码
RateLimiter的作用
RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。与Semaphore 相比,Semaphore 限制了并发访问的数量而不是使用速率。(注意尽管并发性和速率是紧密相关的,比如参考Little定律)
通过设置许可证的速率来定义RateLimiter。在默认配置下,许可证会在固定的速率下被分配,速率单位是每秒多少个许可证。为了确保维护配置的速率,许可会被平稳地分配,许可之间的延迟会做调整。
可能存在配置一个拥有预热期的RateLimiter 的情况,在这段时间内,每秒分配的许可数会稳定地增长直到达到稳定的速率。
举例来说明如何使用RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:
//速率是每秒两个许可 final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // 也许需要等待 executor.execute(task); } } 复制代码
再举另外一个例子,想象下我们制造了一个数据流,并希望以每秒5kb的速率处理它。可以通过要求每个字节代表一个许可,然后指定每秒5000个许可来完成:
// 每秒5000个许可 final RateLimiter rateLimiter = RateLimiter.create(5000.0); void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); } 复制代码
有一点很重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制。
如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter并不提供公平性的保证。
create方法
- create(double permitsPerSecond, long warmupPeriod, TimeUnit unit):根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
- create(double permitsPerSecond):根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
acquire方法
- acquire():从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
public double acquire() 复制代码
从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求。如果存在等待的情况的话,告诉调用者获取到该请求所需要的睡眠时间。该方法等同于acquire(1)。
- 返回值
执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
- acquire(int permits):从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求。
public double acquire(int permits) 复制代码
从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求数。如果存在等待的情况的话,告诉调用者获取到这些请求数所需要的睡眠时间。
- 参数:permits – 需要获取的许可数
- 返回:执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
- 抛出:IllegalArgumentException – 如果请求的许可数为负数或者为0
tryAcquire方法
- tryAcquire():从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
- tryAcquire(int permits):从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
- tryAcquire(int permits, long timeout, TimeUnit unit):从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
- tryAcquire(long timeout, TimeUnit unit):从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)
详细分析
public static RateLimiter create(double permitsPerSecond) 复制代码
根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)。
返回的RateLimiter 确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。
当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。
当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。
参数
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
抛出异常
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) 复制代码
根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率(只要存在足够请求数来使其饱和)。
同样地,如果RateLimiter 在warmupPeriod时间内闲置不用,它将会逐步地返回冷却状态。
它会像它第一次被创建般经历同样的预热期。返回的RateLimiter 主要用于那些需要预热期的资源,这些资源实际上满足了请求(比如一个远程服务),而不是在稳定(最大)的速率下可以立即被访问的资源。返回的RateLimiter 在冷却状态下启动(即预热期将会紧跟着发生),并且如果被长期闲置不用,它将回到冷却状态。
参数
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
- warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
- unit – 参数warmupPeriod 的时间单位
抛出异常
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
实践案例
第1次获取10个令牌
- nowMicro是刚开始运行的时间,是一个很小的数,约等于0;
- resync(nowMicro),更新令牌数,由于nowMicro约等于0,其实令牌数不会更新((0-0)/5000 = 0),令牌数还是0(约等于0)
- storedPermitsToSpend,其实当前并没有令牌,所以取min,约等于0;
- freshPermits,需要预支付10个令牌,约等于10;
- 预支付之后需要等待10 * interval = 10 * 500 ,约等于5000ms,5000000微秒
- this.nextFreeTicketMicros 需要加上 waitMicros 也就是 下一次可以获得令牌的时间是5000ms之后。
- 所以我们看到输出信息的第一行在第0s获取了10个令牌之后,下一次再想获取1个令牌需要等待5000ms也就是5s。
第2次获取1个令牌
然后再一次想获取1个令牌,当前时间还是约等于0,这时候resync,nowMicros(0)比nextFreeTicketMicros(5000)小,令牌不更新。
returnValue=5000,storedPermitsToSpend=0,freshPermits=1,需要再等 waitMicros=1 * 500ms,然后nextFreeTicketMicros更新为5000+500=5500,返回returnValue=5000;外层函数睡眠5000ms,返回5000(输出打印获取1个token,约5s)
第3次获取10个令牌
上面说的,睡了5000ms,当前时间nowMicros=5000;
resync,nowMicros(5000)比nextFreeTicketMicros(5500)小,令牌不更新,还是欠费状态,只能预支付。
returnValue=5500, storedPermitsToSpend=0,freshPermits=10,需要预支付10个令牌, waitMicros=10 * 500ms = 5000,然后nextFreeTicketMicros更新为5500+5000=10500,返回returnValue=5500;外层函数睡眠5500-5000=500ms,返回500(输出打印获取10个token,约0.5s)