【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含源码分析介绍)

简介: 【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含源码分析介绍)

Guava包中限流实现分析


RateLimiter


之前的文章中已经介绍了常用的限流算法,而google在Java领域中使用Guava包中的限流工具进行服务限流。



回顾使用案例


Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。


@Test
public void  testSample() {
    RateLimiter rateLimiter = RateLimiter.create(500)
}
复制代码


以上示例,创建一个RateLimiter,指定每秒放500个令牌(0.002秒放1个令牌),其输出见下:


从输出结果可以看出,RateLimiter具有预消费的能力:


  • 请求 1时并没有任何等待直接预消费了1个令牌
  • 请求 2时,由于之前预消费了1个令牌,故而等待了2秒,之后又预消费了6个令牌
  • 请求 3时同理,由于之前预消费了6个令牌,故而等待了12秒



属于线性处理机制。


  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
  • 但是某些情况下并不需要这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能预消费,否则,则可能出现推送频率超出限制而失败。
  • 其中RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。
  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的,RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。
  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。标准创建者模式UML图如下所示(引用自百度百科)




Guava包中限流工具类

image.png


Guava核心限流类介绍


  • RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。


Guava有两种限流模式


  • 一种为稳定模式(SmoothBursty:令牌生成速度恒定)
  • 一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)

两种模式实现思路类似,主要区别在等待时间的计算上,



Guava RateLimiter核心类实现


  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的。RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。
  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。


image.png


RateLimiter类即承担了builder的职责,也承担了Product的职责。


SmoothBursty


  • Guava包RateLimiter类的说明文档,首先使用create函数创建限流器,指定每秒生成2个令牌,在需要调用服务时使用acquire函数或取令牌。


create函数分析


  • create函数具有两个个重载,根据不同的重载可能创建不同的RateLimiter具体实现子类。
  • 目前可返回的实现子类包括SmoothBursty及SmoothWarmingUp两种,具体不同下文详细分析。
  • 在调用create接口时,实际实例化的为SmoothBursty类
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}
复制代码



在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;
/**
 * The maximum number of stored permits.
 * 最大存储令牌数
 */
double maxPermits;
/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔
 */
double stableIntervalMicros;
/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L;
// could be either in the past or future


image.png


tryAcquire函数实现机制


  • 就非常容易理解RateLimiter暴露出来的接口
@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}
复制代码
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}
public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}
复制代码
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
  • tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
  • canAcquire用于判断timeout时间内是否可以获取令牌



resync函数


该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}
复制代码



acquire函数分析


acquire函数也具有两个重载类,但分析过程仅仅需要关系具有整形参数的函数重载即可,无参数的函数仅仅是acquire(1)的简便写法。


在acquire(int permits)函数中主要完成三件事:


  • 预分配授权数量,此函数返回需要等待的时间,可能为0;
  • 根据等待时间进行休眠;
  • 以秒为单位,返回获取授权消耗的时间。


完成以上工作的过程中,RateLimiter类确定了获取授权的过程骨架并且实现了一些通用的方法,这些通用方法中会调用为实现的抽象方法,开发人员根据不同的算法需求可实现特定子类对抽象方法进行覆盖。


其调用流程如下图:image.png


其中橙色块中reserveEarliestAvailable方法即为需要子类进行实现的,下文以该函数为核心,分析RateLimiter类的子类是如何实现该方法的。



final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
  double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
复制代码


  • 该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点
  • 其中,storedPermitsToSpend为桶中可以消费的令牌数,freshPermits为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros
  • 需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。



SmoothBursty的构造函数


SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}
复制代码
  • 桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
  • 该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。



抽象函数分析


在以上文代码分析中出现了两个抽象函数coolDownIntervalMicros及storedPermitsToWaitTime,现分析这两个抽象函数。


coolDownIntervalMicros函数


**主要含义为生成一个令牌需要消耗的时间,该函数主要应用于计算当前时间可产生的令牌数。根据上文的UML图SmoothRateLimiter类有两个子类SmoothBursty及SmoothWarmingUp。 **



SmoothBursty类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return stableIntervalMicros;
}
复制代码


可见实现非常简单,仅仅只是返回stableIntervalMicros属性,即产生两个令牌需要的时间间隔。

SmoothWarmingUp类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return warmupPeriodMicros / maxPermits;
}
复制代码


  • 其中maxPermits属性上文已经出现过,表示当前令牌桶的最大容量。
  • warmupPeriodMicros属性属于SmoothWarmingUp类的特有属性,表示令牌桶中令牌从0到maxPermits需要经过的时间,故warmupPeriodMicros / maxPermits表示在令牌数量达到maxPermits之前的令牌产生时间间隔。



storedPermitsToWaitTime函数


主要表示消耗存储在令牌桶中的令牌需要的时间。

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}
复制代码



直接返回0,表示消耗令牌不需要时间。

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // measuring the integral on the right part of the function (the climbing line)
  if (availablePermitsAboveThreshold > 0.0) {
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // TODO(cpovirk): Figure out a good name for this variable.
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // measuring the integral on the left part of the function (the horizontal line)
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}
复制代码



  • 实现较为复杂,其核心思想在于计算消耗当前存储令牌时需要根据预热设置区别对待。其中涉及到新变量thresholdPermits,该变量为令牌阈值,当当前存储的令牌数大于该值时,消耗(storedPermits-thresholdPermits)范围的令牌需要有预热的过程(即消耗每个令牌的间隔时间慢慢减小),而消耗0~thresholdPermits个数的以存储令牌,每个令牌消耗时间为固定值,即stableIntervalMicros。
  • 而thresholdPermits取值需要考虑预热时间及令牌产生速度两个属性,即thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;。可见阈值为预热时间中能够产生的令牌数的一半,并且根据注释计算消耗阈值以上的令牌的时间可以转换为计算预热图的梯形面积(实际为积分),本处不详细展开。
  • 使用此种设计可以保证在上次请求间隔时间较长时,令牌桶中存储了较多的令牌,当消耗这些令牌时,最开始的令牌消耗时间较长,后续时间慢慢缩短直到达到stableIntervalMicros的状态,产生预热的效果。





实现总结


  • 根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?


  • 一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。


  • 在实现限流器的过程中,基于令牌桶的思想,并且增加了带有预热器的令牌桶限流器实现。被限流的线程使用其自带的SleepingStopwatch工具类,最终使用的是Thread.sleep(ms, ns);方法,而线程使用sleep休眠时其持有的锁并不会释放,在多线程编程时此处需要注意。


  • 最后,限流器触发算法采用的是预定令牌的方式,即当前请求需要的令牌数不会对当前请求的等待时间造成影响,而是会影响下一次请求的等待时间。















相关文章
|
2天前
|
SQL 监控 Java
技术前沿:Java连接池技术的最新发展与应用
本文探讨了Java连接池技术的最新发展与应用,包括高性能与低延迟、智能化管理和监控、扩展性与兼容性等方面。同时,结合最佳实践,介绍了如何选择合适的连接池库、合理配置参数、使用监控工具及优化数据库操作,为开发者提供了一份详尽的技术指南。
17 7
|
4天前
|
移动开发 前端开发 Java
过时的Java技术盘点:避免在这些领域浪费时间
【10月更文挑战第14天】 在快速发展的Java生态系统中,新技术层出不穷,而一些旧技术则逐渐被淘汰。对于Java开发者来说,了解哪些技术已经过时是至关重要的,这可以帮助他们避免在这些领域浪费时间,并将精力集中在更有前景的技术上。本文将盘点一些已经或即将被淘汰的Java技术,为开发者提供指导。
29 7
|
2天前
|
Java 数据库连接 数据库
优化之路:Java连接池技术助力数据库性能飞跃
在Java应用开发中,数据库操作常成为性能瓶颈。频繁的数据库连接建立和断开增加了系统开销,导致性能下降。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接,显著减少连接开销,提升系统性能。文章详细介绍了连接池的优势、选择标准、使用方法及优化策略,帮助开发者实现数据库性能的飞跃。
14 4
|
2天前
|
SQL Java 数据库连接
打破瓶颈:利用Java连接池技术提升数据库访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,避免了频繁的连接建立和断开,显著提升了数据库访问效率。常见的连接池库包括HikariCP、C3P0和DBCP,它们提供了丰富的配置选项和强大的功能,帮助优化应用性能。
15 2
|
3天前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
19 3
|
4天前
|
前端开发 Java API
过时Java技术的退役:这些技能你不再需要掌握!
【10月更文挑战第22天】 在快速变化的技术领域,一些曾经流行的Java技术已经逐渐被淘汰,不再适用于现代软件开发。了解这些过时的技术对于新手开发者来说尤为重要,以避免浪费时间和精力学习不再被行业所需的技能。本文将探讨一些已经或即将被淘汰的Java技术,帮助你调整学习路径,专注于那些更有价值的技术。
14 1
|
6月前
|
安全 Java
从零开始学习 Java:简单易懂的入门指南之不可变集合、方法引用(二十六)
从零开始学习 Java:简单易懂的入门指南之不可变集合、方法引用(二十六)
|
5月前
|
存储 Java API
Java——Stream流(1/2):Stream流入门、Stream流的创建(认识Stream、体验Stream流、Stream流的使用步骤、获取Stream流的方法)
Java——Stream流(1/2):Stream流入门、Stream流的创建(认识Stream、体验Stream流、Stream流的使用步骤、获取Stream流的方法)
75 0
|
Java 索引
从零开始学习 Java:简单易懂的入门指南之方法(六)
方法的概念:方法(method)是程序中最小的执行单元注意:方法必须先创建才可以使用,该过程成为方法定义,方法创建后并不是直接可以运行的,需要手动使用后,才执行,该过程成为方法调用
从零开始学习 Java:简单易懂的入门指南之方法(六)