Go 分布式令牌桶限流 + 兜底保障

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Go 分布式令牌桶限流 + 兜底保障

上篇文章提到固定时间窗口限流无法处理突然请求洪峰情况,本文讲述的令牌桶线路算法则可以比较好的处理此场景。

工作原理

  1. 单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。
  2. 处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。

优缺点

优点

可以有效处理瞬间的突发流量,桶内存量 token 即可作为流量缓冲区平滑处理突发流量。

缺点

实现较为复杂。

代码实现

core/limit/tokenlimit.go

分布式环境下考虑使用 redis 作为桶和令牌的存储容器,采用 lua 脚本实现整个算法流程。

redis lua 脚本

-- 每秒生成token数量即token生成速度
local rate = tonumber(ARGV[1])
-- 桶容量
local capacity = tonumber(ARGV[2])
-- 当前时间戳
local now = tonumber(ARGV[3])
-- 当前请求token数量
local requested = tonumber(ARGV[4])
-- 需要多少秒才能填满桶
local fill_time = capacity/rate
-- 向下取整,ttl为填满时间的2倍
local ttl = math.floor(fill_time*2)
-- 当前时间桶容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
-- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量
if last_tokens == nil then
last_tokens = capacity
end
-- 上一次刷新的时间
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- 第一次进入则设置刷新时间为0
if last_refreshed == nil then
last_refreshed = 0
end
-- 距离上次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
-- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 本次请求token数量是否足够
local allowed = filled_tokens >= requested
-- 桶剩余数量
local new_tokens = filled_tokens
-- 允许本次token申请,计算剩余数量
if allowed then
new_tokens = filled_tokens - requested
end
-- 设置剩余token数量
redis.call("setex", KEYS[1], ttl, new_tokens)
-- 设置刷新时间
redis.call("setex", KEYS[2], ttl, now)
return allowed

令牌桶限流器定义

type TokenLimiter struct {
    // 每秒生产速率
    rate int
    // 桶容量
    burst int
    // 存储容器
    store *redis.Redis
    // redis key
    tokenKey       string
    // 桶刷新时间key
    timestampKey   string
    // lock
    rescueLock     sync.Mutex
    // redis健康标识
    redisAlive     uint32
    // redis故障时采用进程内 令牌桶限流器
    rescueLimiter  *xrate.Limiter
    // redis监控探测任务标识
    monitorStarted bool
}
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)
    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

获取令牌

func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
    // 判断redis是否健康
    // redis故障时采用进程内限流器
    // 兜底保障
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }
    // 执行脚本获取令牌
    resp, err := lim.store.Eval(
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    // 特殊处理key不存在的情况
    if err == redis.Nil {
        return false
    } else if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 执行异常,开启redis健康探测任务
        // 同时采用进程内限流器作为兜底
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }
    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }
    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

redis 故障时兜底策略

兜底策略的设计考虑得非常细节,当 redis 不可用的时候,启动单机版的 ratelimit 做备用限流,确保基本的限流可用,服务不会被冲垮。

// 开启redis健康探测
func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()
    // 防止重复开启
    if lim.monitorStarted {
        return
    }
    // 设置任务和健康标识
    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)
    // 健康探测
    go lim.waitForRedis()
}
// redis健康探测定时任务
func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 健康探测成功时回调此函数
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()
    for range ticker.C {
        // ping属于redis内置健康探测命令
        if lim.store.Ping() {
            // 健康探测成功,设置健康标识
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

项目地址

https://github.com/zeromicro/go-zero

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
算法 Go
[go 面试] 雪花算法与分布式ID生成
[go 面试] 雪花算法与分布式ID生成
|
4月前
|
存储 调度
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
分布式锁设计问题之云存储的最佳实践中保障分布式锁的容错能力如何解决
|
6月前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
132 4
|
4月前
|
监控 Go API
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
|
4月前
|
运维 安全 Cloud Native
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
|
4月前
|
Go API 数据库
[go 面试] 分布式事务框架选择与实践
[go 面试] 分布式事务框架选择与实践
|
4月前
|
算法 Go 数据库
[go 面试] 并发与数据一致性:事务的保障
[go 面试] 并发与数据一致性:事务的保障
|
4月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)
|
4月前
|
Kubernetes Go 数据库
go-zero 分布式事务最佳实践
go-zero 分布式事务最佳实践
|
4月前
|
NoSQL Go Redis
用 Go + Redis 实现分布式锁
用 Go + Redis 实现分布式锁