简介
上一篇文章 限流实现1 已经介绍三种限流方案
- 随机拒流
- 计数器方式
- 基于滑动时间窗口限流
剩下的几种本来打算能立即写完,没想到一下三个月过去了,很是尴尬。本次主要实现如下两种算法
- 令牌桶算法
- 漏斗算法
算法的具体实现可以在github上查看 https://github.com/shidawuhen/asap/tree/master/controller/limit
下面先来介绍一下这两种算法
令牌桶算法
算法说明
令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。令牌桶算法示意图如下所示:
一般的流程为:
a. 按特定的速率向令牌桶投放令牌
b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不需要经过令牌桶的处理,直接发送;
c. 符合匹配规则的报文,则需要令牌桶进行处理。当桶中有足够的令牌则报文可以被继续发送下去,同时令牌桶中的令牌量按报文的长度做相应的减少;
d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才可以发送。这就可以限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。
大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。
令牌桶能允许突发,是因为令牌桶一般有两个值,一个是桶容量,一个是单位时间投放令牌量。如果桶容量大于令牌单位时间投放量,而且单位时间消耗量比投放量少,则令牌数量最终会达到桶容量最大值。此时如果大量请求到达,会将所有令牌消耗,实现了允许突发的效果。
算法实现
该算法有几个核心点
- 因为要更新令牌数量,所以需要加锁
- 定时向桶中放入令牌,有两种方式,一种是起goroutine,定时投放,另一种是在判断是否还有足够令牌的时候,根据投放情况进行投放。这次实现使用的是第二种方法,整个架构会更简单一些。
package limit
import (
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
// @Tags limit
// @Summary 令牌桶拒流
// @Produce json
// @Success 200 {string} string "成功会返回ok"
// @Failure 502 "失败返回reject"
// @Router /limit/tokenreject [get]
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //桶上次放token的时间戳 s
lock sync.Mutex
}
func (l *TokenBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
now := time.Now().Unix()
l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
if l.tokens > l.capacity {
l.tokens = l.capacity
}
l.lastTokenSec = now
if l.tokens > 0 {
// 还有令牌,领取令牌
l.tokens--
return true
} else {
// 没有令牌,则拒绝
return false
}
}
func (l *TokenBucket) Set(r, c int64) {
l.rate = r
l.capacity = c
l.tokens = 0
l.lastTokenSec = time.Now().Unix()
}
func CreateTokenBucket()*TokenBucket{
t := &TokenBucket{}
t.Set(1,5)
return t
}
var tokenBucket *TokenBucket = CreateTokenBucket()
func TokenReject(c *gin.Context) {
//fmt.Println(tokenBucket.tokens)
if !tokenBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}
漏桶算法
算法说明
漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
- 一个固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,则不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
- 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。
漏桶法限流很好理解,假设我们有一个水桶按固定的速率向下方滴落一滴水,无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。
示意图如下:
算法实现
查阅了一下相关资料,主要的算法实现有三种。学习完这几种实现后,我怀疑我是不是理解错了,感觉还没有计数拒流方便好用。如果我理解的有误,大家也可以告诉我。
这三种方式为:
令牌桶算法变种
桶的大小就是单位时间内能流出的最大量,这种就不写了。
计数拒流变种
该方法在指定时间将可用空间置为初始值。
package limit
import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
type LeakyBucket struct {
// 容量
capacity int64
// 剩余大小
remaining int64
// 下一次的重置容量时间
reset time.Time
// 重置容量时间间隔
rate time.Duration
mutex sync.Mutex
}
func (b *LeakyBucket) Allow() bool {
b.mutex.Lock()
defer b.mutex.Unlock()
if time.Now().After(b.reset) { // 需要重置
b.reset = time.Now().Add(b.rate) // 更新时间
b.remaining = b.capacity // 重置剩余容量
}
fmt.Println(b.remaining)
if b.remaining > 0 { // 判断是否能过
b.remaining--
return true
}
return false
}
func (b *LeakyBucket) Set(r time.Duration, c int64) {
b.rate = r
b.capacity = c
b.remaining = c
b.reset = time.Now().Add(b.rate)
}
func CreateLeakyBucket(r time.Duration,c int64) *LeakyBucket {
t := &LeakyBucket{}
t.Set(r, c)
return t
}
var leakyBucket *LeakyBucket = CreateLeakyBucket(time.Second*2,10)
func LeakyReject(c *gin.Context) {
if !leakyBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}
真固定速率
这个算法的的实现,根据uber团队开源的 github.com/uber-go/ratelimit 而来。
该实现保证如果有大量请求,每一个请求会按照规定的时间间隔执行。如设置1s处理100个请求,则每10ms会处理一个。
如果长时间没有请求,仍会产生请求在短时间内被处理完毕的情况。当然对于这种情况可以很容易修复,大家可以思考一下如何修改。
package limit
import (
"fmt"
"github.com/andres-erbsen/clock"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
//真固定速率
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
type limiter struct {
sync.Mutex // 锁
last time.Time // 上一次的时刻
sleepFor time.Duration // 需要等待的时间
perRequest time.Duration // 每次的时间间隔
maxSlack time.Duration // 最大的富余量
clock Clock // 时钟
}
// Take 会阻塞确保两次请求之间的时间走完
// Take 调用平均数为 time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// 如果是第一次请求就直接放行
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor 根据 perRequest 和上一次请求的时刻计算应该sleep的时间
// 由于每次请求间隔的时间可能会超过perRequest, 所以这个数字可能为负数,并在多个请求之间累加
t.sleepFor += t.perRequest - now.Sub(t.last)
fmt.Println(t.sleepFor)
// 我们不应该让sleepFor负的太多,因为这意味着一个服务在短时间内慢了很多随后会得到更高的RPS。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// 如果 sleepFor 是正值那么就 sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
func NewLimiter(rate int) *limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
var rl = NewLimiter(100) // per second,每秒100个请求
func LeakyRejectFixedRate(c *gin.Context) {
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
c.String(http.StatusOK, "ok")
}
演示结果如下:
总结
学习了令牌桶和漏桶算法,结合这几年工作中的具体场景,感觉令牌桶算法的实用价值更大一些。
下面是令牌桶和漏桶对比:
- 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
- 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
- 令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
- 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
- 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;
- 两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。
最后,给大家展示一下各种限流算法的比较
资料
最后
大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)
我的个人博客为:https://shidawuhen.github.io/
往期文章回顾:
技术
- 秒杀系统
- 分布式系统与一致性协议
- 微服务之服务框架和注册中心
- Beego框架使用
- 浅谈微服务
- TCP性能优化
- 限流实现1
- Redis实现分布式锁
- Golang源码BUG追查
- 事务原子性、一致性、持久性的实现原理
- CDN请求过程详解
- 常用缓存技巧
- 如何高效对接第三方支付
- Gin框架简洁版
- InnoDB锁与事务简析
- 算法总结
读书笔记
思考