【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含源码分析介绍)

简介: 【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含源码分析介绍)

Guava包中限流实现分析


RateLimiter


之前的文章中已经介绍了常用的限流算法,而google在Java领域中使用Guava包中的限流工具进行服务限流。



回顾使用案例


Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。


@Test
public void  testSample() {
    RateLimiter rateLimiter = RateLimiter.create(500)
}
复制代码


以上示例,创建一个RateLimiter,指定每秒放500个令牌(0.002秒放1个令牌),其输出见下:


从输出结果可以看出,RateLimiter具有预消费的能力:


  • 请求 1时并没有任何等待直接预消费了1个令牌
  • 请求 2时,由于之前预消费了1个令牌,故而等待了2秒,之后又预消费了6个令牌
  • 请求 3时同理,由于之前预消费了6个令牌,故而等待了12秒



属于线性处理机制。


  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
  • 但是某些情况下并不需要这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能预消费,否则,则可能出现推送频率超出限制而失败。
  • 其中RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。
  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的,RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。
  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。标准创建者模式UML图如下所示(引用自百度百科)




Guava包中限流工具类

image.png


Guava核心限流类介绍


  • RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。


Guava有两种限流模式


  • 一种为稳定模式(SmoothBursty:令牌生成速度恒定)
  • 一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)

两种模式实现思路类似,主要区别在等待时间的计算上,



Guava RateLimiter核心类实现


  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的。RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。
  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。


image.png


RateLimiter类即承担了builder的职责,也承担了Product的职责。


SmoothBursty


  • Guava包RateLimiter类的说明文档,首先使用create函数创建限流器,指定每秒生成2个令牌,在需要调用服务时使用acquire函数或取令牌。


create函数分析


  • create函数具有两个个重载,根据不同的重载可能创建不同的RateLimiter具体实现子类。
  • 目前可返回的实现子类包括SmoothBursty及SmoothWarmingUp两种,具体不同下文详细分析。
  • 在调用create接口时,实际实例化的为SmoothBursty类
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}
复制代码



在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;
/**
 * The maximum number of stored permits.
 * 最大存储令牌数
 */
double maxPermits;
/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔
 */
double stableIntervalMicros;
/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L;
// could be either in the past or future


image.png


tryAcquire函数实现机制


  • 就非常容易理解RateLimiter暴露出来的接口
@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}
复制代码
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}
public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}
复制代码
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
  • tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
  • canAcquire用于判断timeout时间内是否可以获取令牌



resync函数


该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}
复制代码



acquire函数分析


acquire函数也具有两个重载类,但分析过程仅仅需要关系具有整形参数的函数重载即可,无参数的函数仅仅是acquire(1)的简便写法。


在acquire(int permits)函数中主要完成三件事:


  • 预分配授权数量,此函数返回需要等待的时间,可能为0;
  • 根据等待时间进行休眠;
  • 以秒为单位,返回获取授权消耗的时间。


完成以上工作的过程中,RateLimiter类确定了获取授权的过程骨架并且实现了一些通用的方法,这些通用方法中会调用为实现的抽象方法,开发人员根据不同的算法需求可实现特定子类对抽象方法进行覆盖。


其调用流程如下图:image.png


其中橙色块中reserveEarliestAvailable方法即为需要子类进行实现的,下文以该函数为核心,分析RateLimiter类的子类是如何实现该方法的。



final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
  double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
复制代码


  • 该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点
  • 其中,storedPermitsToSpend为桶中可以消费的令牌数,freshPermits为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros
  • 需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。



SmoothBursty的构造函数


SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}
复制代码
  • 桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
  • 该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。



抽象函数分析


在以上文代码分析中出现了两个抽象函数coolDownIntervalMicros及storedPermitsToWaitTime,现分析这两个抽象函数。


coolDownIntervalMicros函数


**主要含义为生成一个令牌需要消耗的时间,该函数主要应用于计算当前时间可产生的令牌数。根据上文的UML图SmoothRateLimiter类有两个子类SmoothBursty及SmoothWarmingUp。 **



SmoothBursty类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return stableIntervalMicros;
}
复制代码


可见实现非常简单,仅仅只是返回stableIntervalMicros属性,即产生两个令牌需要的时间间隔。

SmoothWarmingUp类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return warmupPeriodMicros / maxPermits;
}
复制代码


  • 其中maxPermits属性上文已经出现过,表示当前令牌桶的最大容量。
  • warmupPeriodMicros属性属于SmoothWarmingUp类的特有属性,表示令牌桶中令牌从0到maxPermits需要经过的时间,故warmupPeriodMicros / maxPermits表示在令牌数量达到maxPermits之前的令牌产生时间间隔。



storedPermitsToWaitTime函数


主要表示消耗存储在令牌桶中的令牌需要的时间。

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}
复制代码



直接返回0,表示消耗令牌不需要时间。

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // measuring the integral on the right part of the function (the climbing line)
  if (availablePermitsAboveThreshold > 0.0) {
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // TODO(cpovirk): Figure out a good name for this variable.
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // measuring the integral on the left part of the function (the horizontal line)
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}
复制代码



  • 实现较为复杂,其核心思想在于计算消耗当前存储令牌时需要根据预热设置区别对待。其中涉及到新变量thresholdPermits,该变量为令牌阈值,当当前存储的令牌数大于该值时,消耗(storedPermits-thresholdPermits)范围的令牌需要有预热的过程(即消耗每个令牌的间隔时间慢慢减小),而消耗0~thresholdPermits个数的以存储令牌,每个令牌消耗时间为固定值,即stableIntervalMicros。
  • 而thresholdPermits取值需要考虑预热时间及令牌产生速度两个属性,即thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;。可见阈值为预热时间中能够产生的令牌数的一半,并且根据注释计算消耗阈值以上的令牌的时间可以转换为计算预热图的梯形面积(实际为积分),本处不详细展开。
  • 使用此种设计可以保证在上次请求间隔时间较长时,令牌桶中存储了较多的令牌,当消耗这些令牌时,最开始的令牌消耗时间较长,后续时间慢慢缩短直到达到stableIntervalMicros的状态,产生预热的效果。





实现总结


  • 根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?


  • 一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。


  • 在实现限流器的过程中,基于令牌桶的思想,并且增加了带有预热器的令牌桶限流器实现。被限流的线程使用其自带的SleepingStopwatch工具类,最终使用的是Thread.sleep(ms, ns);方法,而线程使用sleep休眠时其持有的锁并不会释放,在多线程编程时此处需要注意。


  • 最后,限流器触发算法采用的是预定令牌的方式,即当前请求需要的令牌数不会对当前请求的等待时间造成影响,而是会影响下一次请求的等待时间。















相关文章
|
1月前
|
存储 监控 安全
单位网络监控软件:Java 技术驱动的高效网络监管体系构建
在数字化办公时代,构建基于Java技术的单位网络监控软件至关重要。该软件能精准监管单位网络活动,保障信息安全,提升工作效率。通过网络流量监测、访问控制及连接状态监控等模块,实现高效网络监管,确保网络稳定、安全、高效运行。
68 11
|
1月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
73 7
|
24天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
98 60
|
24天前
|
NoSQL 算法 Java
Java Redis多限流
通过本文的介绍,我们详细讲解了如何在Java中使用Redis实现三种不同的限流策略:固定窗口限流、滑动窗口限流和令牌桶算法。每种限流策略都有其适用的场景和特点,根据具体需求选择合适的限流策略可以有效保护系统资源和提高服务的稳定性。
51 18
|
1月前
|
移动开发 前端开发 Java
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
JavaFX是Java的下一代图形用户界面工具包。JavaFX是一组图形和媒体API,我们可以用它们来创建和部署富客户端应用程序。 JavaFX允许开发人员快速构建丰富的跨平台应用程序,允许开发人员在单个编程接口中组合图形,动画和UI控件。本文详细介绍了JavaFx的常见用法,相信读完本教程你一定有所收获!
Java最新图形化界面开发技术——JavaFx教程(含UI控件用法介绍、属性绑定、事件监听、FXML)
|
18天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
85 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
1月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
2月前
|
Java 程序员 数据库连接
Java中的异常处理:从入门到精通
在Java编程的海洋中,异常处理是一艘不可或缺的救生艇。它不仅保护你的代码免受错误数据的侵袭,还能确保用户体验的平稳航行。本文将带你领略异常处理的风浪,让你学会如何在Java中捕捉、处理和预防异常,从而成为一名真正的Java航海家。