限流算法 - 基本实现

简介: 限流算法 - 基本实现


在开发高并发系统时有三把利器用来保护系统:缓存、降级、 限流 , 今天我们就谈谈限流

缓存:缓存的目的是提升系统访问速度和增大系统处理容量 降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行 限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

我下面算法的实现基本上都用到了定时器Timer , 其实关于时间的也可以不用定时器, 可以看看GuavaRateLimiter, 定时器的好处是我不用处理时间逻辑 , 但是需要消耗一个线程去执行逻辑 , 当逻辑算力压力过大会线程处理不过来,效果不好 , 可以使用一下 ScheduledThreadPoolExecutor 线程池来执行,降低压力

同时还大量使用了队列数据结构 ,是因为生产者消费者模型大多需要队列, 先进先出的特点

第一节是环境搭建 , 写出需求 ,和接口要求 , 和测试用例 ,后面四节就是基本算法

1. 环境搭建

我们模拟Filter#doFilter 接口进行测试 , 全部实现 AbstractLimiter#limit方法

Filter 实现

public interface Filter {
    default public void init() {
    }
    public void doFilter(ServletRequest request, ServletResponse response,
                         FilterChain chain);
    default public void destroy() {
    }
}
复制代码

FilterChain 实现

public interface FilterChain {
    void doFilter(ServletRequest request, ServletResponse response);
}
复制代码

ServletRequest 实现

public class ServletRequest {
    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public String toString() {
        return "ServletRequest{" +
                "msg='" + msg + '\'' +
                '}';
    }
    public ServletRequest(String msg) {
        this.msg = msg;
    }
}
复制代码

ServletResponse 实现

public class ServletResponse {
}
复制代码

AbstractLimiter 实现

public abstract class AbstractLimiter {
    /**
     * 最大流量
     */
    protected final int MAX_FlOW;
    /**
     * 构造器 , 输入每秒最大流量
     * @param MAX_FlOW 最大流量
     */
    public AbstractLimiter(int MAX_FlOW) {
        this.MAX_FlOW = MAX_FlOW;
    }
    /**
     * 具体实现的方法
     * @param request 请求
     * @param response 响应
     * @param chain 执行
     */
    public abstract void limit(ServletRequest request, ServletResponse response, FilterChain chain);
}
复制代码

Demo 测试类

public class Demo {
    @Test
    public void test() {
        
        // 过滤器
        Filter filter = new Filter() {
            AbstractLimiter limit = null;
            @Override
            public void init() {
                // 入口 ,我们都是每秒限制 100个请求
                limit = new LeakyBucketLimiter(100);
            }
            @Override
            public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
                limit.limit(request, response, chain);
            }
        };
        // 过滤器初始化
        filter.init();
        // 计时器
        long start = System.currentTimeMillis();
        
        // 计数器
        AtomicInteger integer = new AtomicInteger(0);
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 模拟4000次请求
        IntStream.range(0, 4000).forEach(e -> {
            try {
                // 模拟请求延迟
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e1) {
                //
            }
            // 多线程执行
            pool.execute(()->{
                filter.doFilter(new ServletRequest("" + e), new ServletResponse(), new FilterChain() {
                    @Override
                    public void doFilter(ServletRequest request, ServletResponse response) {
                        // 回调接口
                        integer.incrementAndGet();
                        System.out.println("请求 : "+request.getMsg() + " 通过, 执行线程 "+Thread.currentThread().getName());
                    }
                });
            });
        });
        System.out.println("总耗时" + (System.currentTimeMillis() - start));
        System.out.println("一共通过 : " + integer.get());
    }
}
复制代码

2. 计数器算法

计数器算法(Counter) 顾明思议就是一个计数器 , 比如我每秒可以通过100个请求 , 我呢每进来一个请求, 我就将计数器+1 , 当计数器到达了100,此时我就不让请求过去 , 但是他存在一个问题 : 比如我第999ms 的时候过来100个请求 , 当刚刚过了1000ms的时候初始化了,但是又来了100个请求 , 此时就会发生实际上在这0.1S的时候处理了200个请求 , 严重超载了 , 此时服务器处理不了而全部都请求超时了....

public class CounterLimiter extends AbstractLimiter {
    private static final Integer initFlow = 0;
    private final AtomicInteger flow;
    public CounterLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
    
        // 初始化计数器
        flow = new AtomicInteger(initFlow);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                // 每1000ms初始化一次
                flow.set(initFlow);
            }
        }, 0, 1000);
    }
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        // 比较是否超载
        if (flow.get() < MAX_FlOW) {
    // 通过 : 计数器+1
            flow.incrementAndGet();
            chain.doFilter(request, response);
        }
    }
}
复制代码

3. 滑动窗口算法

滑动窗口算法(Rolling - Window) 可以说是计数器算法的一种改进 , 他呢 , 将计算器细分了, 比如我将1S的 1000ms 细分为10个 100ms , 我们就有10个计数器 , 比如上面的问题 , 999ms和1000ms的问题, 由于我们是连续的, 此时1000ms进来的我也算进去了, 此时就不会出现那种情况 ,

当我们的颗粒度越高 , 此时所计算的资源会越多,也会越精确 , 其实对比 Hystrixsentinel 都是这种思想, 滑动窗口算法 , 主要是考虑的计算资源少的问题 ,

我的算法并不是最优 ,其实不需要使用ArrayBlockingQueue去维护滑块 , 由于我们是单个线程去执行并不会出现多线程问题, 其实可以使用 LinkedList 来模拟队列 , 还有其他点也可以看一下

public class RollingWindowFilter extends AbstractLimiter {
    /**
     * 我们的滑动窗口对象,包含多个窗口
     */
    private final Slider slider;
    /**
     * 程序中暴露的唯一一个计数器,可以称之为当前窗口
     */
    private AtomicInteger counter;
    /**
     * 计数器初始化大小
     */
    private static final int INIT_SIZE = 0;
    /**
     * 比如窗口分为10块,这个代表先进入9块窗口的计算值 , 为什么要引入是因为不浪费计算资源, 好多都是重复计算
     */
    private final AtomicInteger preCount;
    /**
     * 我们默认队列大小是 20 ,其实颗粒度很高了50ms计算一次, 可以重载构造参数调整
     *
     * @param MAX_FlOW 最大流量
     */
    public RollingWindowFilter(int MAX_FlOW) {
        super(MAX_FlOW);
        // 初始化窗口,感觉改名字叫做Windows比较好 ....
        slider = new Slider(20);
        // 初始化对象
        preCount = new AtomicInteger(INIT_SIZE);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                ArrayBlockingQueue<AtomicInteger> queue = slider.blocks;
                // 当前窗口大小
                int size = queue.size();
                /**
                 * 初始化窗口长度
                 */
                if (size < slider.capacity) {
                    try {
                        /**
                         * 计算前面窗口的计数器总和
                         * 
                         * 这里其实由多线程的并发问题 ,其实可以设置一个标识符来表示完成与否 .. 我懒得改了 ,或者你就大量实例化对象,不用我这个单一对象
                         */
                        preCount.set(INIT_SIZE);
                        if (size > 0) {
                            queue.forEach(e -> preCount.addAndGet(e.get()));
                        }
                        // 新建一个计数器, 放入对应的滑块 ,其实就是队尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }
                }
                /**
                 * 当窗口长度初始化完成
                 */
                if (size == slider.capacity) {
                    try {
                        // 出局最先进来的那个
                        queue.take();
                        // 计算前面窗口的计数器总和 , 有多线程并发问题
                        preCount.set(INIT_SIZE);
                        queue.forEach(e -> preCount.addAndGet(e.get()));
                        
                        // 新建一个计数器, 放入对应的滑块 ,其实就是队尾
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //
                    }
                }
            }
        }, 0, 1000 / slider.capacity);
    }
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        int cur = counter.get();
        int pre = preCount.get();
        int sum = cur + pre;
        if (sum < MAX_FlOW) {
            counter.incrementAndGet();
            chain.doFilter(request, response);
        }
    }
    /**
     * 滑块组成 , 一个队列维护一个块 , 其实可以用LinkedList来维护 , 我是懒得改
     * <p>
     * 一般内部类来说看JDK源码你会发现都会用private static修饰 ,因为反射不是静态内部类,无法实例化 , 和构造器不加修饰
     */
    private static class Slider {
        // 多少个计数器
        private final int capacity;
        // 放置计数器
        private final ArrayBlockingQueue<AtomicInteger> blocks;
        Slider(int capacity) {
            this.blocks = new ArrayBlockingQueue<>(capacity);
            this.capacity = capacity;
        }
    }
}
复制代码

4. 漏桶算法

其实所谓的漏桶算法(Leaky Bucket),我们想一下 , 有一个入水口和一个出水口 , 我们这俩口控制权在谁那 ,入水口无非就是大量的请求, 出水口就是我们放过的请求 , 所以他是一个生产者 - 消费者模型 , 生产者就是请求 , 消费者就是以一定速度我们消费请求 ,

漏桶算法可以使请求流出的速率是均匀的, 不管你多少请求 , 我流出的速率是均匀的 , 当桶满了就溢出 ,没有满加进来就等着被流出去

当你看懂我上面的两段话 , 你就理解了下面的代码 , 我的注释十分清晰

public class LeakyBucketLimiter extends AbstractLimiter {
    /**
     * 我们的漏斗
     */
    private final LeakyBucket leakyBucket;
    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public LeakyBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.leakyBucket = new LeakyBucket(MAX_FlOW);
    }
    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        try {
            // 1. 获取桶当前水的大小
            int size = leakyBucket.bucket.size();
            // 2. 比较桶里的水是否满了
            if (size < leakyBucket.waterSize) {
                // 没有满我们就将水放进去,其实这里put也行 , offer也行 , 看需求
                leakyBucket.bucket.put(new Water(request, response, chain));
            }
        } catch (InterruptedException e) {
            //
        }
    }
    static class LeakyBucket {
        /**
         * 能放多少水,其实就是队列大小
         */
        final int waterSize;
        /**
         * 我们的放水的桶
         */
        final ArrayBlockingQueue<Water> bucket;
        public LeakyBucket(int MAX_FlOW) {
            this.waterSize = MAX_FlOW;
            bucket = new ArrayBlockingQueue<>(this.waterSize);
            /**
             * 模拟消费 , 1S只能过去100个 ,说明 100ms 可以消耗10个, 看你的颗粒度
             */
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    // 100ms 流出去10个
                    for (int i = 0; i < (waterSize / 10); i++) {
                        try {
                            // 流出的水
                            Water water = bucket.take();
                            // 执行掉
                            water.chain.doFilter(water.request, water.response);
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }
    /**
     * 我们的节点对象, 其实可以称之为 成功注入的水 , 等着被漏桶流出去
     */
    static class Water {
        private ServletRequest request;
        private ServletResponse response;
        private FilterChain chain;
        public Water(ServletRequest request, ServletResponse response, FilterChain chain) {
            this.request = request;
            this.response = response;
            this.chain = chain;
        }
    }
}
复制代码

5. 令牌桶算法

令牌桶算法(Token Bucket) 是与漏桶算法相反的思想, 他也是生产者消费者模型 ,只是角色的互换, 他呢是我们去控制生成 , 请求去执行消费 , 举个栗子 : 比如我们限流100 , 此时我们就每100ms生成10个令牌 , 当令牌数达到100 我们就不生产 了, 当一个请求过来 , 就会去拿掉一个令牌 , 如果拿到了就通过了, 拿不到就拒绝

根据这个我们可以和漏桶算法做比较 ,假设都是刚刚开始 , 此时都是100个请求过来 , 令牌桶可能会拒绝掉90个,因为我只生产了10个令牌 ,但是漏桶呢他不会, 他会将100个请求全部放进去慢慢消费 , 是因为我的桶容量是100,可以放进去这么多请求 , 这就是这俩的区别 .... 其实稳定了几乎么区别

生产者消费者模型 的思想转换可以更加理清思路 , 模型的选择有时候是解决问题的一个合适的方式

令牌桶算法 网上大多都是采用的 GuavaRateLimiter实现的 , 这里我就实现两种 一种是自己实现, 一种是使用RateLimiter,

1. 自己实现的令牌桶

public class TokenBucketLimiter extends AbstractLimiter {
    /**
     * 令牌桶
     */
    private final TokenBucket tokenBucket;
    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public TokenBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.tokenBucket = new TokenBucket(MAX_FlOW);
    }
    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 这里我们就不使用 take的阻塞思想了 ,直接poll去拉去 ,然后等待5mS ,  如果拉去不到直接返回失败 , 其实等待的长了点
         */
        try {
            // 尝试去获取一个令牌
            Token token = tokenBucket.bucket.poll(5, TimeUnit.MILLISECONDS);
            
            // 拿到通过
            if (null != token) {
                chain.doFilter(request, response);
            }
        } catch (InterruptedException e) {
            //
        }
    }
    /**
     * 令牌桶
     */
    private static class TokenBucket {
        /**
         * 令牌存放的位置 , 用一个队列维护
         */
        private final ArrayBlockingQueue<Token> bucket;
        /**
         * 桶最多存放多少个令牌
         */
        private final int tokenSize;
        public TokenBucket(int MAX_FlOW) {
            this.tokenSize = MAX_FlOW;
            this.bucket = new ArrayBlockingQueue<>(this.tokenSize);
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    for (int x = 0; x < (tokenSize / 10); x++) {
                        try {
                            if (bucket.size() < tokenSize) {
                                // 定时放入令牌
                                bucket.put(new Token());
                            }
                        } catch (InterruptedException e) {
                            //
                        }
                    }
                }
            }, 0, 100);
        }
    }
    /**
     * 令牌
     */
    private static class Token {
    }
}
复制代码

2. 基于Guava 的 RateLimiter实现令牌桶

public class GuavaRateLimiter extends AbstractLimiter {
    /**
     * 令牌桶
     */
    private final RateLimiter limiter;
    /**
     * 每次需要的令牌个数
     */
    private static final int ACQUIRE_NUM = 1;
    /**
     * 最长等待时间
     */
    private static final int WAIT_TIME_PER_MILLISECONDS = 5;
    /**
     * 构造器 , 输入每秒最大流量
     *
     * @param MAX_FlOW 最大流量
     */
    public GuavaRateLimiter(final int MAX_FlOW) {
        super(MAX_FlOW);
        limiter = RateLimiter.create(MAX_FlOW);
    }
    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /**
         * 意思就是 我尝试去获取1个令牌 ,最大等待时间是 5 ms , 其实太长了, 真是开发也就1ms不到
         */
        boolean flag = limiter.tryAcquire(ACQUIRE_NUM, WAIT_TIME_PER_MILLISECONDS, TimeUnit.MILLISECONDS);
        if (flag) {
            chain.doFilter(request, response);
        }
    }
}


目录
相关文章
|
7月前
|
数据采集 算法 双11
高并发的场景下,不能不说的限流算法
高并发的场景下,不能不说的限流算法
92 1
|
4月前
|
算法 NoSQL Java
spring cloud的限流算法有哪些?
【8月更文挑战第18天】spring cloud的限流算法有哪些?
88 3
|
5月前
|
算法 NoSQL Java
常用的限流算法有哪些?你听说过几种?
限流,就是指限制流量请求的频次。在高并发情况下,它是一种保护系统的策略,避免了在流量高峰时系统崩溃,造成系统的不可用。
72 1
|
5月前
|
存储 算法 Java
高并发架构设计三大利器:缓存、限流和降级问题之滑动日志算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之滑动日志算法问题如何解决
|
5月前
|
算法 Java 调度
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
|
5月前
|
缓存 算法 Java
高并发架构设计三大利器:缓存、限流和降级问题之使用代码实现漏桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用代码实现漏桶算法问题如何解决
|
5月前
|
算法 UED 缓存
高并发架构设计三大利器:缓存、限流和降级问题之滑动窗口算法适用于哪些场景
高并发架构设计三大利器:缓存、限流和降级问题之滑动窗口算法适用于哪些场景
|
5月前
|
存储 算法 缓存
高并发架构设计三大利器:缓存、限流和降级问题之滑动窗口算法的原理是什么
高并发架构设计三大利器:缓存、限流和降级问题之滑动窗口算法的原理是什么
|
5月前
|
算法 API 缓存
高并发架构设计三大利器:缓存、限流和降级问题之固定窗口限流算法的原理是什么
高并发架构设计三大利器:缓存、限流和降级问题之固定窗口限流算法的原理是什么
|
5月前
|
监控 算法 Java
详解 Java 限流接口实现问题之避免令牌桶限流算法可能导致的过载问题如何解决
详解 Java 限流接口实现问题之避免令牌桶限流算法可能导致的过载问题如何解决