FlowRule,限流规则
每个规则包含三个核心概念:grade,strategy和controlBehavior
grade代表流控类型
0-通过线程数控制,1-通过QPS控制
strategy表示的是如果服务被限流,谁受到影响
controlBehavior,表示采取哪种方式计算是否要限流
0-默认策略(default or directly)-DefaultController:使用时间窗口进行QPS限流,如果未达到QPS就放行。达到QPS,而prioritized=true,会让线程sleep到下一个时间窗口在尝试。
1-warm up-warmUpController
这个差点给我搞晕了。逻辑是:会设置一个warningToken数量,当现在token桶里面剩余token数大于这个warning数量,就认为之前一段时间QPS很小,新请求全都打进来,我可能还得初始化新的数据库连接等,所以我认为这是一个冷启动的状态,这个时候能通过的请求QPS就得打折扣,免得一下给我搞坏了。但是如果token桶里面剩余的token数小于warningToken的数量,说明之前系统都处于请求很多比较活跃的状态,不需要再去加载数据库连接等,所以新进来的请求书只要不超过QPS限制就可以,这个时候限流逻辑和default一致
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
另一个点,坡度计算,我没看懂
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
2-rate limiter-RateLimiterController:和default模式相比,更侧重于从频率层面限流。比如对于Default模式,如果QPS设置为10,那1秒内不管这10个请求什么时候进来,都是可以通过的。可以在最后100ms内通过10个请求。而Rate limiter的要求是,第一个请求通过后,必须再过超过100ms才能有第二个请求通过。
还有一个点是,虽然当前还没到达时间间隔,请求无法通过,但是如果在一定等待时间以内(maxQueueingTimeMs)可以达到指定间隔,那请求线程可以sleep住,等到间隔时间到了再通过
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
3-warm up + rate limiter-WarmUpRateLimiterController
这个就字面意思,warm up的rate limiter版本,把warm up的QPS换成频率来计算限流。