什么是限流
限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统 的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
比如场景:
某天小明突然发现自己的接口请求突然之间涨到了原来的10倍,接口几乎不能使用,产生了一系列连锁反应,导致了整个系统崩溃。这就好比,老电闸中都安装了保险丝,一旦使用大功率设备,保险丝就会熔断,保证各个电器不被强电流烧坏,系统也同样安装保险丝,防止非预期请求过大,引起系统瘫痪。
限流方法
常用的限流算法有:计数法,滑动窗口计数法,漏桶算法和令牌桶算法。
漏桶算法思路
水(请求)进入到漏桶里,漏桶以一定的速度流出,当水流的速度过大会直接溢出, 漏桶是强行限制了数据的传输速率。
令牌桶算法
除了要能够限制数据的平均传输速率外,还需要允许某种程度的突发请求,令牌桶更为合适。
令牌桶的思路是以一个恒定的速率往桶里放令牌,如果请求需要被处理,则需要从桶里取出一个令牌,如果没有令牌可取,那么就拒绝服务。
Google开源工具包Guava提供了限流工具类RateLimiter是基于令牌桶算法来实现的。
public double acquire() { return acquire(1); } public double acquire(int permits) { checkPermits(permits); //检查参数是否合法(是否大于0) long microsToWait; synchronized (mutex) { //应对并发情况需要同步 microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间 } ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0 return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L); } private long reserveNextTicket(double requiredPermits, long nowMicros) { resync(nowMicros); //补充令牌 long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros; double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目 double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌 return microsToNextFreeTicket; } private void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { storedPermits = Math.min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); nextFreeTicketMicros = nowMicros; } }
计数器
控制单位时间内的请求数量
import java.util.concurrent.atomic.AtomicInteger; public class Counter { /** * 最大访问数量 */ private final int limit = 10; /** * 访问时间差 */ private final long timeout = 1000; /** * 请求时间 */ private long time; /** * 当前计数器 */ private AtomicInteger reqCount = new AtomicInteger(0); public boolean limit() { long now = System.currentTimeMillis(); if (now < time + timeout) { // 单位时间内 reqCount.getAndAdd(1); return reqCount.get() <= limit; } else { // 超出单位时间 time = now; reqCount = new AtomicInteger(0); return true; } } public static void main(String[] args) { } }
计数方式有没有问题?
假设每分钟请求数量为 60 个,每秒钟系统可以处理1个请求,用户在00:59 发送了60 个请求,然后在 1:00 发生了60个请求,此时 2 秒内有120个请求(每秒60)个请求,这样的方式并没有实现限制流量,因为每分钟可以处理60个,但是实际上这60个是一秒钟发过来的。
滑动窗口计数
滑动窗口是对计数方式对改进,增加一个时间粒度的度量单位。
把一分钟分成了若干等份,比如分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,限流统计越详细。
package ratelimit; import java.util.Iterator; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.IntStream; public class TimeWindow { private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>(); /**a * 间隔秒数 */ private int seconds; /** * 最大限流 */ private int max; public TimeWindow(int max, int seconds) { this.seconds = seconds; this.max = max; new Thread(() -> { while (true) { try { Thread.sleep((seconds - 1) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } clean(); } }).start(); } public static void main(String[] args) { final TimeWindow timeWindow = new TimeWindow(10, 1); IntStream.range(0, 3).forEach((i) -> { new Thread(() -> { try { while (true) { Thread.sleep(new Random().nextInt(20) * 100); } } catch (InterruptedException e) { e.printStackTrace(); } timeWindow.take(); }).start(); }); } public void take() { long start = System.currentTimeMillis(); int size = sizeOfValid(); if (size > max) { System.out.println("超限"); } synchronized (queue) { if (sizeOfValid() > max) { System.err.println("超限"); System.err.println("queue 中有:" + queue.size() + "最大数量:" + max); } this.queue.offer(System.currentTimeMillis()); } System.err.println("queue 中有:" + queue.size() + "最大数量:" + max); } private int sizeOfValid() { Iterator<Long> it = queue.iterator(); Long ms = System.currentTimeMillis() - seconds * 1000; int count = 0; while (it.hasNext()) { long t = it.next(); if (t > ms) { //在时间窗口范围内 count++; } } return count; } public void clean() { Long c = System.currentTimeMillis() - seconds * 1000; Long t1 = null; while ((t1 = queue.peek()) != null && t1 < c) { System.out.println("数据清理"); queue.poll(); } } }
令牌桶问题
令牌桶规定固定容量的桶,令牌 token 以固定速度往桶内填充,当桶填满时 token 不会继续放入,每过来一个请求把 token 从桶中移除,当没有 token 可以获取时,拒绝请求。
令牌桶算法
当网络设备衡量流量是否超过额定带宽时,需要查看令牌桶,而令牌桶中会放置一定数量的令牌,一个令牌允许接口发送或接收1bit数据(有时是1 Byte数据),当接口通过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流量都被视为超过额定带宽,只有当桶中有令牌时,数据才可以通过接口。令牌桶中的令牌不仅仅可以被移除,同样也可以往里添加,所以为了保证接口随时有数据通过,就必须不停地往桶里加令牌,由此可见,往桶里加令牌的速度,就决定了数据通过接口的速度。 因此,我们通过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),通常以秒为单位。比如我们设置用户的带宽为1000 bit每秒,只要保证每秒钟往桶里添加1000个令牌即可。
令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是不让自己的系统垮掉。
令牌桶算法代码
package com.netease.datastream.util.flowcontrol; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * <pre> * Created by inter12 on 15-3-18. * </pre> */ public class TokenBucket { // 默认桶大小个数 即最大瞬间流量是64M private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64; // 一个桶的单位是1字节 private int everyTokenSize = 1; // 瞬间最大流量 private int maxFlowRate; // 平均流量 private int avgFlowRate; // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * // 1024 * 64 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>( DEFAULT_BUCKET_SIZE); private ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(); private volatile boolean isStart = false; private ReentrantLock lock = new ReentrantLock(true); private static final byte A_CHAR = 'a'; public TokenBucket() { } public TokenBucket(int maxFlowRate, int avgFlowRate) { this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) { this.everyTokenSize = everyTokenSize; this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public void addTokens(Integer tokenNum) { // 若是桶已经满了,就不再家如新的令牌 for (int i = 0; i < tokenNum; i++) { tokenQueue.offer(Byte.valueOf(A_CHAR)); } } public TokenBucket build() { start(); return this; } /** * 获取足够的令牌个数 * * @return */ public boolean getTokens(byte[] dataSize) { // Preconditions.checkNotNull(dataSize); // Preconditions.checkArgument(isStart, // "please invoke start method first !"); int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数 final ReentrantLock lock = this.lock; lock.lock(); try { boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量 if (!result) { return false; } int tokenCount = 0; for (int i = 0; i < needTokenNum; i++) { Byte poll = tokenQueue.poll(); if (poll != null) { tokenCount++; } } return tokenCount == needTokenNum; } finally { lock.unlock(); } } public void start() { // 初始化桶队列大小 if (maxFlowRate != 0) { tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate); } // 初始化令牌生产者 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this); scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS); isStart = true; } public void stop() { isStart = false; scheduledExecutorService.shutdown(); } public boolean isStarted() { return isStart; } class TokenProducer implements Runnable { private int avgFlowRate; private TokenBucket tokenBucket; public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) { this.avgFlowRate = avgFlowRate; this.tokenBucket = tokenBucket; } @Override public void run() { tokenBucket.addTokens(avgFlowRate); } } public static TokenBucket newBuilder() { return new TokenBucket(); } public TokenBucket everyTokenSize(int everyTokenSize) { this.everyTokenSize = everyTokenSize; return this; } public TokenBucket maxFlowRate(int maxFlowRate) { this.maxFlowRate = maxFlowRate; return this; } public TokenBucket avgFlowRate(int avgFlowRate) { this.avgFlowRate = avgFlowRate; return this; } private String stringCopy(String data, int copyNum) { StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); for (int i = 0; i < copyNum; i++) { sbuilder.append(data); } return sbuilder.toString(); } public static void main(String[] args) throws IOException, InterruptedException { tokenTest(); } private static void arrayTest() { ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>( 10); tokenQueue.offer(1); tokenQueue.offer(1); tokenQueue.offer(1); System.out.println(tokenQueue.size()); System.out.println(tokenQueue.remainingCapacity()); } private static void tokenTest() throws InterruptedException, IOException { TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512) .maxFlowRate(1024).build(); BufferedWriter bufferedWriter = new BufferedWriter( new OutputStreamWriter(new FileOutputStream("D:/ds_test"))); String data = "xxxx";// 四个字节 for (int i = 1; i <= 1000; i++) { Random random = new Random(); int i1 = random.nextInt(100); boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes()); TimeUnit.MILLISECONDS.sleep(100); if (tokens) { bufferedWriter.write("token pass --- index:" + i1); System.out.println("token pass --- index:" + i1); } else { bufferedWriter.write("token rejuect --- index" + i1); System.out.println("token rejuect --- index" + i1); } bufferedWriter.newLine(); bufferedWriter.flush(); } bufferedWriter.close(); } }