​什么是限流,如何限流

简介: ​什么是限流,如何限流

什么是限流


限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统 的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。


比如场景:

某天小明突然发现自己的接口请求突然之间涨到了原来的10倍,接口几乎不能使用,产生了一系列连锁反应,导致了整个系统崩溃。这就好比,老电闸中都安装了保险丝,一旦使用大功率设备,保险丝就会熔断,保证各个电器不被强电流烧坏,系统也同样安装保险丝,防止非预期请求过大,引起系统瘫痪。


限流方法


常用的限流算法有:计数法,滑动窗口计数法,漏桶算法和令牌桶算法。


漏桶算法思路

水(请求)进入到漏桶里,漏桶以一定的速度流出,当水流的速度过大会直接溢出, 漏桶是强行限制了数据的传输速率。

image.png

令牌桶算法


除了要能够限制数据的平均传输速率外,还需要允许某种程度的突发请求,令牌桶更为合适。

令牌桶的思路是以一个恒定的速率往桶里放令牌,如果请求需要被处理,则需要从桶里取出一个令牌,如果没有令牌可取,那么就拒绝服务。

image.png

Google开源工具包Guava提供了限流工具类RateLimiter是基于令牌桶算法来实现的。

public double acquire() {
        return acquire(1);
    }
 public double acquire(int permits) {
        checkPermits(permits);  //检查参数是否合法(是否大于0)
        long microsToWait;
        synchronized (mutex) { //应对并发情况需要同步
            microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间 
        }
        ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
        return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }
private long reserveNextTicket(double requiredPermits, long nowMicros) {
        resync(nowMicros); //补充令牌
        long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
        double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目
        double freshPermits = requiredPermits - storedPermitsToSpend;
        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                + (long) (freshPermits * stableIntervalMicros); 
        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
        this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
        return microsToNextFreeTicket;
    }
private void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        if (nowMicros > nextFreeTicketMicros) {
            storedPermits = Math.min(maxPermits,
                    storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
            nextFreeTicketMicros = nowMicros;
        }
    }


计数器

控制单位时间内的请求数量

import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
    /**
     * 最大访问数量
     */
    private final int limit = 10;
    /**
     * 访问时间差
     */
    private final long timeout = 1000;
    /**
     * 请求时间
     */
    private long time;
    /**
     * 当前计数器
     */
    private AtomicInteger reqCount = new AtomicInteger(0);
    public boolean limit() {
        long now = System.currentTimeMillis();
        if (now < time + timeout) {
            // 单位时间内
            reqCount.getAndAdd(1);
            return reqCount.get() <= limit;
        } else {
            // 超出单位时间
            time = now;
            reqCount = new AtomicInteger(0);
            return true;
        }
    }
    public static void main(String[] args) {
    }
}
计数方式有没有问题?


假设每分钟请求数量为 60 个,每秒钟系统可以处理1个请求,用户在00:59 发送了60 个请求,然后在 1:00 发生了60个请求,此时 2 秒内有120个请求(每秒60)个请求,这样的方式并没有实现限制流量,因为每分钟可以处理60个,但是实际上这60个是一秒钟发过来的。

滑动窗口计数


滑动窗口是对计数方式对改进,增加一个时间粒度的度量单位。

把一分钟分成了若干等份,比如分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,限流统计越详细。

package ratelimit;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
public class TimeWindow {
    private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
    /**a
     * 间隔秒数
     */
    private int seconds;
    /**
     * 最大限流
     */
    private int max;
    public TimeWindow(int max, int seconds) {
        this.seconds = seconds;
        this.max = max;
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep((seconds - 1) * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                clean();
            }
        }).start();
    }
    public static void main(String[] args) {
        final TimeWindow timeWindow = new TimeWindow(10, 1);
        IntStream.range(0, 3).forEach((i) -> {
            new Thread(() -> {
                try {
                    while (true) {
                        Thread.sleep(new Random().nextInt(20) * 100);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                timeWindow.take();
            }).start();
        });
    }
    public void take() {
        long start = System.currentTimeMillis();
        int size = sizeOfValid();
        if (size > max) {
            System.out.println("超限");
        }
        synchronized (queue) {
            if (sizeOfValid() > max) {
                System.err.println("超限");
                System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);
            }
            this.queue.offer(System.currentTimeMillis());
        }
        System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);
    }
    private int sizeOfValid() {
        Iterator<Long> it = queue.iterator();
        Long ms = System.currentTimeMillis() - seconds * 1000;
        int count = 0;
        while (it.hasNext()) {
            long t = it.next();
            if (t > ms) {
                //在时间窗口范围内
                count++;
            }
        }
        return count;
    }
    public void clean() {
        Long c = System.currentTimeMillis() - seconds * 1000;
        Long t1 = null;
        while ((t1 = queue.peek()) != null && t1 < c) {
            System.out.println("数据清理");
            queue.poll();
        }
    }
}


令牌桶问题


令牌桶规定固定容量的桶,令牌 token 以固定速度往桶内填充,当桶填满时 token 不会继续放入,每过来一个请求把 token 从桶中移除,当没有 token 可以获取时,拒绝请求。


令牌桶算法


当网络设备衡量流量是否超过额定带宽时,需要查看令牌桶,而令牌桶中会放置一定数量的令牌,一个令牌允许接口发送或接收1bit数据(有时是1 Byte数据),当接口通过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流量都被视为超过额定带宽,只有当桶中有令牌时,数据才可以通过接口。令牌桶中的令牌不仅仅可以被移除,同样也可以往里添加,所以为了保证接口随时有数据通过,就必须不停地往桶里加令牌,由此可见,往桶里加令牌的速度,就决定了数据通过接口的速度。 因此,我们通过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),通常以秒为单位。比如我们设置用户的带宽为1000  bit每秒,只要保证每秒钟往桶里添加1000个令牌即可。

image.png

令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是不让自己的系统垮掉。

令牌桶算法代码

package com.netease.datastream.util.flowcontrol;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
 * <pre>
 * Created by inter12 on 15-3-18.
 * </pre>
 */
public class TokenBucket {
    // 默认桶大小个数 即最大瞬间流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
    // 一个桶的单位是1字节
    private int everyTokenSize = 1;
    // 瞬间最大流量
    private int maxFlowRate;
    // 平均流量
    private int avgFlowRate;
    // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
    // 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
            DEFAULT_BUCKET_SIZE);
    private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor();
    private volatile boolean isStart = false;
    private ReentrantLock lock = new ReentrantLock(true);
    private static final byte A_CHAR = 'a';
    public TokenBucket() {
    }
    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }
    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }
    public void addTokens(Integer tokenNum) {
        // 若是桶已经满了,就不再家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(Byte.valueOf(A_CHAR));
        }
    }
    public TokenBucket build() {
        start();
        return this;
    }
    /**
     * 获取足够的令牌个数
     * 
     * @return
     */
    public boolean getTokens(byte[] dataSize) {
//        Preconditions.checkNotNull(dataSize);
//        Preconditions.checkArgument(isStart,
//                "please invoke start method first !");
        int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
            if (!result) {
                return false;
            }
            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }
            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }
    public void start() {
        // 初始化桶队列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }
        // 初始化令牌生产者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
                TimeUnit.SECONDS);
        isStart = true;
    }
    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }
    public boolean isStarted() {
        return isStart;
    }
    class TokenProducer implements Runnable {
        private int avgFlowRate;
        private TokenBucket tokenBucket;
        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }
        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }
    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }
    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }
    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }
    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }
    private String stringCopy(String data, int copyNum) {
        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }
        return sbuilder.toString();
    }
    public static void main(String[] args) throws IOException,
            InterruptedException {
        tokenTest();
    }
    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
                10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }
    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
                .maxFlowRate(1024).build();
        BufferedWriter bufferedWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
        String data = "xxxx";// 四个字节
        for (int i = 1; i <= 1000; i++) {
            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
                    i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }
            bufferedWriter.newLine();
            bufferedWriter.flush();
        }
        bufferedWriter.close();
    }
}
令牌桶和漏桶的选择问题
相关文章
|
1天前
|
算法 NoSQL Java
服务、服务间接口限流实现
`shigen`是一位坚持更新博客的写手,专注于记录个人成长、分享认知与感动。本文探讨了接口限流的重要性,通过实例分析了在调用第三方API时遇到的“请求过多”问题及其解决方法,包括使用`Thread.sleep()`和`Guava RateLimiter`进行限流控制,以及在分布式环境中利用Redis实现更高效的限流策略。
8 0
服务、服务间接口限流实现
|
5月前
|
缓存 Java 应用服务中间件
常见的限流降级方案
【1月更文挑战第21天】
|
Java Spring
springcloud gateway sential 限流 自定义参数限流执行顺序问题
springcloud gateway sential 限流 自定义参数限流执行顺序问题
148 1
|
Java 开发者 Sentinel
Sentinel 手动实现限流规则 | 学习笔记
快速学习 Sentinel 手动实现限流规则
235 0
|
算法 NoSQL JavaScript
服务限流,我有6种实现方式…
服务限流,我有6种实现方式…
|
监控 Sentinel 微服务
【Sentinel】流控效果与热点参数限流
【Sentinel】流控效果与热点参数限流
361 0
【Sentinel】流控效果与热点参数限流
|
缓存 NoSQL 算法
限流实现-专题一
在实际业务中,经常会碰到突发流量的情况。如果公司基础架构做的不好,服务无法自动扩容缩容,在突发高流量情况下,服务会因为压力过大而崩溃。更恐怖的是,服务崩溃如同多米诺骨牌,一个服务出问题,可能影响到整个公司所有组的业务。
|
缓存 算法 网络协议
限流实现2
剩下的几种本来打算能立即写完,没想到一下三个月过去了,很是尴尬。本次主要实现如下两种算法 - 令牌桶算法 - 漏斗算法
|
算法 NoSQL API
限流功能的实现
限流功能的实现
186 0
|
Java 开发者 Spring
服务保护、服务限流、服务降级的概念|学习笔记
快速学习服务保护、服务限流、服务降级的概念
192 0