单机限流和分布式应用限流

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 单机限流和分布式应用限流

单机限流

大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃。此时你需要使用的技术手段之一就是限流,当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在限流时,常见的两种算法是漏桶和令牌桶算法算法。

单机限流算法主要有:令牌桶(Token Bucket)、漏桶(leaky bucket)和计数器算法是最常用的三种限流的算法。

1. 令牌桶算法

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。 当桶满时,新添加的令牌被丢弃或拒绝

public class RateLimiterDemo {
    private static RateLimiter limiter = RateLimiter.create(5);
    public static void exec() {
        limiter.acquire(1);
        try {
            // 处理核心逻辑
            TimeUnit.SECONDS.sleep(1);
            System.out.println("--" + System.currentTimeMillis() / 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Guava RateLimiter 提供了令牌桶算法可用于平滑突发限流策略。

该示例为每秒中产生5个令牌,每200毫秒会产生一个令牌。

limiter.acquire() 表示消费一个令牌。当桶中有足够的令牌时,则直接返回0,否则阻塞,直到有可用的令牌数才返回,返回的值为阻塞的时间。

2. 漏桶算法

它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量,数据可以以任意速度流入到漏桶中。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。 漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶为空,则不需要流出水滴,如果漏桶(包缓存)溢出,那么水滴会被溢出丢弃

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 漏斗限流算法
 *
 * @author dadiyang
 * @date 2018/9/28
 */
public class FunnelRateLimiter {
    private Map<String, Funnel> funnelMap = new ConcurrentHashMap<>();
    public static void main(String[] args) throws InterruptedException {
        FunnelRateLimiter limiter = new FunnelRateLimiter();
        int testAccessCount = 30;
        int capacity = 5;
        int allowQuota = 5;
        int perSecond = 30;
        int allowCount = 0;
        int denyCount = 0;
        for (int i = 0; i < testAccessCount; i++) {
            boolean isAllow = limiter.isActionAllowed("dadiyang", "doSomething", 5, 5, 30);
            if (isAllow) {
                allowCount++;
            } else {
                denyCount++;
            }
            System.out.println("访问权限:" + isAllow);
            Thread.sleep(1000);
        }
        System.out.println("报告:");
        System.out.println("漏斗容量:" + capacity);
        System.out.println("漏斗流动速率:" + allowQuota + "次/" + perSecond + "秒");
        System.out.println("测试次数=" + testAccessCount);
        System.out.println("允许次数=" + allowCount);
        System.out.println("拒绝次数=" + denyCount);
    }
    /**
     * 根据给定的漏斗参数检查是否允许访问
     *
     * @param username   用户名
     * @param action     操作
     * @param capacity   漏斗容量
     * @param allowQuota 每单个单位时间允许的流量
     * @param perSecond  单位时间(秒)
     * @return 是否允许访问
     */
    public boolean isActionAllowed(String username, String action, int capacity, int allowQuota, int perSecond) {
        String key = "funnel:" + action + ":" + username;
        if (!funnelMap.containsKey(key)) {
            funnelMap.put(key, new Funnel(capacity, allowQuota, perSecond));
        }
        Funnel funnel = funnelMap.get(key);
        return funnel.watering(1);
    }
    private static class Funnel {
        private int capacity;
        private float leakingRate;
        private int leftQuota;
        private long leakingTs;
        public Funnel(int capacity, int count, int perSecond) {
            this.capacity = capacity;
            // 因为计算使用毫秒为单位的
            perSecond *= 1000;
            this.leakingRate = (float) count / perSecond;
        }
        /**
         * 根据上次水流动的时间,腾出已流出的空间
         */
        private void makeSpace() {
            long now = System.currentTimeMillis();
            long time = now - leakingTs;
            int leaked = (int) (time * leakingRate);
            if (leaked < 1) {
                return;
            }
            leftQuota += leaked;
            // 如果剩余大于容量,则剩余等于容量
            if (leftQuota > capacity) {
                leftQuota = capacity;
            }
            leakingTs = now;
        }
        /**
         * 漏斗漏水
         *
         * @param quota 流量
         * @return 是否有足够的水可以流出(是否允许访问)
         */
        public boolean watering(int quota) {
            makeSpace();
            int left = leftQuota - quota;
            if (left >= 0) {
                leftQuota = left;
                return true;
            }
            return false;
        }
    }
}

3. 计数器限流算法

数器限流算法也是比较常用的,主要用来限制总并发数,比如数据库连接池大小、线程池大小、程序访问并发数等都是使用计数器算法。

public class CountRateLimiterDemo {
    private static Semaphore semphore = new Semaphore(5);
    public static void exec() {
        if(semphore.getQueueLength()>100){
            System.out.println("当前等待排队的任务数大于100,请稍候再试...");
        }
        try {
            semphore.acquire();
            // 处理核心逻辑
            TimeUnit.SECONDS.sleep(1);
            System.out.println("--" + System.currentTimeMillis() / 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semphore.release();
        }
    }
}

使用Semaphore信号量来控制并发执行的次数,如果超过域值信号量,则进入阻塞队列中排队等待获取信号量进行执行。如果阻塞队列中排队的请求过多超出系统处理能力,则可以在拒绝请求。

4.漏桶和令牌桶的比较

令牌桶可以在运行时控制和调整数据处理的速率,处理某时的突发流量。放令牌的频率增加可以提升整体数据处理的速度,而通过每次获取令牌的个数增加或者放慢令牌的发放速度和降低整体数据处理速度。而漏桶不行,因为它的流出速率是固定的,程序处理速度也是固定的。更多算法相关:算法聚合

分布式限流

实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。

其中 Redis 就非常适合这样的场景。

  • 每次请求时将方法名进行md5加密后作为Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
  • 当达到阈值时返回错误。
  • 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性

Lua脚本准备

local val = redis.call('incr', KEYS[1])
local ttl = redis.call('ttl', KEYS[1])
redis.log(redis.LOG_NOTICE, "incr "..KEYS[1].." "..val);
if val == 1 then
    redis.call('expire', KEYS[1], tonumber(ARGV[1]))
else
    if ttl == -1 then
        redis.call('expire', KEYS[1], tonumber(ARGV[1]))
    end
end
if val > tonumber(ARGV[2]) then
    return 0
end
return 1

RateLimiter.java

@Component
public class RateLimiter {
    @Autowired
    private RedisClient redisClient;
    @Value("${redis.limit.expire}")
    private int expire;
    @Value("${redis.limit.request.count}")
    private int reqCount;
    @Value("${redis.limit.script.name}")
    private String scriptName;
    public Long limit(String key) {
        return redisClient.eval(key, scriptName, 1, expire, reqCount);
    }
}

RateLimitAspect.java

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.lzhsite.core.exception.OverLimitException;
import com.lzhsite.core.utils.MD5Util;
import com.lzhsite.technology.redis.limit.RateLimiter;
@Aspect
@Component
public class RateLimitAspect {
    @Autowired
    private RateLimiter rateLimiter;
    @Before("@annotation(com.lzhsite.technology.redis.limit.RateLimit)")
    public void before(JoinPoint pjp) throws Throwable {
        Signature sig = pjp.getSignature();
        if (!(sig instanceof MethodSignature)) {
            throw new IllegalArgumentException("该注解只能用在方法上");
        }
        MethodSignature msig = (MethodSignature) sig;
        String methodName = pjp.getTarget().getClass().getName() + "." + msig.getName();
        String limitKey = MD5Util.md5(methodName);
        if (rateLimiter.limit(limitKey) != 1){
            throw new OverLimitException("触发限流控制");
        }
    }
}

RateLimit.java

@Target({ElementType.METHOD})  
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimit {
    String value() default "";
}
@RestController
public class ShopOrderController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ShopOrderController.class);
    @Autowired
    private ShopOrderService shopOrderService;
    @RequestMapping("/seckill")
    @RateLimit
    public String index(Long stockId) {
        try{
            if (shopOrderService.createOrder(stockId)){
                LOGGER.info(ConstantUtil.SNATCH_DEAL_SUCCESS);
                return ConstantUtil.SNATCH_DEAL_SUCCESS;
            }
        } catch (Exception e){
            LOGGER.error(e.getMessage());
            return e.getMessage();
        }
        return ConstantUtil.SYSTEM_EXCEPTION;
    }
}

当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。

完整代码

https://gitee.com/lzhcode/maven-parent/blob/78734ac309aba8f5499e0dd2eefc45c41baf0ebe/lzh-technology/src/main/java/com/lzhsite/aop/RateLimitAspect.java

参考文章

https://blog.csdn.net/sunlihuo/article/details/79700225

https://blog.csdn.net/ghaohao/article/details/80361089


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
11天前
|
存储 运维 应用服务中间件
阿里云分布式存储应用示例
通过阿里云EDAS,您可以轻松部署与管理微服务应用。创建应用时,使用`CreateApplication`接口基于模板生成新应用,并获得包含应用ID在内的成功响应。随后,利用`DeployApplication`接口将应用部署至云端,返回&quot;Success&quot;确认部署成功。当业务调整需下线应用时,调用`ReleaseApplication`接口释放资源。阿里云EDAS简化了应用全生命周期管理,提升了运维效率与可靠性。[相关链接]提供了详细的操作与返回参数说明。
|
14天前
|
存储 分布式计算 算法
探索Hadoop的三种运行模式:单机模式、伪分布式模式和完全分布式模式
在配置Hadoop集群之前,了解这三种模式的特点、适用场景和配置差异是非常重要的。这有助于用户根据个人需求和资源情况,选择最适合自己的Hadoop运行模式。在最初的学习和开发阶段,单机模式和伪分布式模式能为用户提供便利和成本效益。进而,当用户要处理大规模数据集时,完全分布式模式将是理想的选择。
37 2
|
14天前
|
机器学习/深度学习 分布式计算 PyTorch
大规模数据集管理:DataLoader在分布式环境中的应用
【8月更文第29天】随着大数据时代的到来,如何高效地处理和利用大规模数据集成为了许多领域面临的关键挑战之一。本文将探讨如何在分布式环境中使用`DataLoader`来优化大规模数据集的管理与加载过程,并通过具体的代码示例展示其实现方法。
25 1
|
16天前
|
存储 NoSQL 算法
Go 分布式令牌桶限流 + 兜底保障
Go 分布式令牌桶限流 + 兜底保障
|
18天前
|
运维 安全 Cloud Native
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
|
1月前
|
Kubernetes 安全 云计算
分布式应用的终极革命:Distributionless,告别分布式烦恼!
【8月更文挑战第8天】探讨分布式应用的进化形态——Distributionless,一种使开发者聚焦业务逻辑而非系统细节的理念。借助容器化、云计算与自动化工具的进步,分布式应用的开发与管理变得简易。透过示例展现了使用Bazel构建及Kubernetes部署的流程,预示着Distributionless模式下的应用将更加高效、可靠与安全,引领未来分布式应用的发展趋势。
49 7
|
11天前
|
开发者 云计算 数据库
从桌面跃升至云端的华丽转身:深入解析如何运用WinForms与Azure的强大组合,解锁传统应用向现代化分布式系统演变的秘密,实现性能与安全性的双重飞跃——你不可不知的开发新模式
【8月更文挑战第31天】在数字化转型浪潮中,传统桌面应用面临新挑战。本文探讨如何融合Windows Forms(WinForms)与Microsoft Azure,助力应用向云端转型。通过Azure的虚拟机、容器及无服务器计算,可轻松解决性能瓶颈,满足全球用户需求。文中还提供了连接Azure数据库的示例代码,并介绍了集成Azure Storage和Functions的方法。尽管存在安全性、网络延迟及成本等问题,但合理设计架构可有效应对,帮助开发者构建高效可靠的现代应用。
12 0
|
11天前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
20 0
|
21天前
|
监控 Java API
分布式链路监控系统问题之对Java应用实现字节码增强的方式的问题如何解决
分布式链路监控系统问题之对Java应用实现字节码增强的方式的问题如何解决
|
25天前
|
存储 分布式计算 Hadoop
分布式计算框架在大规模数据处理中的应用
【8月更文第18天】随着大数据时代的到来,对海量数据进行有效的存储、处理和分析变得越来越重要。传统的单机系统已经无法满足PB级别数据集的需求。分布式计算框架,如Apache Hadoop和Apache Spark,成为了处理这些大规模数据集的重要工具。
74 0