【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含实战和原理分析)

简介: 【Java技术指南】「并发编程专题」Guava RateLimiter针对于限流器的入门到精通(含实战和原理分析)

并发编程的三剑客


在开发高并发系统时有三剑客:缓存、降级和限流。


  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量。
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开。
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。




限流的思想


溢出思想:

就是用一个固定大小的队列。比如设置限流为5qps,1s可以接受5个请求;那我们就造一个大小为5的队列,如果队列为满了,就拒绝请求;如果队列未满,就往队列添加请求。


速度控制

令牌听起来挺酷的。以固定的速率往桶里发放令牌。然后消费者每次要取到令牌(acquire)才可以响应请求,控制速率呢,我们通过控制消费者的消费速率是5qps,1s消费5个即可。




限流的算法


常用的限流算法有两种:漏桶算法和令牌桶算法及滑动窗口(计数器)算法等。


计数限流算法

无论固定窗口还是滑动窗口核心均是对请求进行计数,区别仅仅在于对于计数时间区间的处理。



固定窗口计数
image.png


实现原理


  • 固定窗口计数法思想比较简单,只需要确定两个参数:计数周期T及周期内最大访问(调用)数N。请求到达时使用以下流程进行操作:
  • 固定窗口计数实现简单,并且只需要记录上一个周期起始时间与周期内访问总数,几乎不消耗额外的存储空间。



算法缺陷


固定窗口计数缺点也非常明显,在进行周期切换时,上一个周期的访问总数会立即置为0,这可能导致在进行周期切换时可能出现流量突发



令牌桶算法


令牌桶算法的原理:系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。


image.png


  • 令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
  • 桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。
  • 当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。
  • 如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。



优点


由于令牌是固定间隔发放的,假设还是5qps,如果我有1s内没有请求,我的令牌桶就满了,可以一瞬间响应5个请求(一次过取5个令牌),也就是可以应对瞬时流量。


image.png


漏桶算法


  • 漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。


image.png



  • 如上图就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。
  • 当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。



漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。




令牌桶和漏桶对比


  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;
  • 漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。




信号量的应用


  • 操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
  • 信号量的本质是控制某个资源可被同时访问的个数,在一定程度上可以控制某资源的访问频率,但不能精确控制。




限流的思想


Guava中的RateLimiter可以限制单进程中某个方法的速率,本文主要介绍如何使用,实现原理请参考文档:推荐:超详细的Guava RateLimiter限流原理解析和推荐:RateLimiter 源码分析(Guava 和 Sentinel 实现)。




Guava RateLimiter


Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便。


原理:Guava RateLimiter基于令牌桶算法,


  • RateLimiter系统限制QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌。
  • 然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。



Guava RateLimiter 控制操作


Guava RateLimiter 限速手段


  • RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。



Maven配置

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0-jre</version>
</dependency>
复制代码


Java简单案例

public class RateLimiterService {
    // 每秒发出5个令牌
    RateLimiter rateLimiter = RateLimiter.create(5);
    /**
     * 尝试获取令牌
     */
    public boolean tryAcquire() {
        return rateLimiter.tryAcquire();
    }
  public  void acquire() {
        rateLimiter.acquire();
    }
   public static void main(String[] args){
        if (accessLimitService.tryAcquire()) {
            log.info("start");
            // 模拟业务执行500毫秒
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "access success [" + LocalDateTime.now() + "]";
        } else {
            //log.warn("限流");
            return "access limit [" + LocalDateTime.now() + "]";
        }
      }
}
public void testMethod(){
  ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
            if (rateLimiter.tryAcquire()) {
                try {
                    log.info("start");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
            } else {
                log.warn("限流");
            }
        }));
public void testMethod2(){
  ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
            rateLimiter.acquire();
            log.info("start");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        pool.shutdown();
      }
  }
}
复制代码
public class GuavaRateLimiter {
    public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<String, RateLimiter>();
    //初始化限流工具RateLimiter
    static {
        createResourceRateLimiter("order", 50);
    }
    public static void createResourceRateLimiter(String resource, double qps) {
        if (resourceRateLimiter.contains(resource)) {
            resourceRateLimiter.get(resource).setRate(qps);
        } else {
            //创建限流工具,每秒发出50个令牌指令
            RateLimiter rateLimiter = RateLimiter.create(qps);
            resourceRateLimiter.putIfAbsent(resource, rateLimiter);
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 5000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //如果获得令牌指令,则执行业务逻辑
                    if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) {
                        System.out.println("执行业务逻辑");
                    } else {
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}
复制代码



方法摘要


限流及创建方法

image.png



create方法
public static RateLimiter create(double permitsPerSecond)
复制代码

根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)。

The returned RateLimiter ensures that on average no more than permitsPerSecond are issued during any given second, with sustained requests being smoothly spread over each second. When the incoming request rate exceeds permitsPerSecond the rate limiter will release one permit every (1.0 / permitsPerSecond) seconds. When the rate limiter is unused, bursts of up to permitsPerSecond permits will be allowed, with subsequent requests being smoothly limited at the stable rate of permitsPerSecond.

返回的RateLimiter


  • 确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。
  • 当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。
  • 当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。



参数:


  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • 抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
复制代码

根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率(只要存在足够请求数来使其饱和)。同样地,如果RateLimiter 在warmupPeriod时间内闲置不用,它将会逐步地返回冷却状态。也就是说,它会像它第一次被创建般经历同样的预热期。返回的RateLimiter 主要用于那些需要预热期的资源,这些资源实际上满足了请求(比如一个远程服务),而不是在稳定(最大)的速率下可以立即被访问的资源。返回的RateLimiter 在冷却状态下启动(即预热期将会紧跟着发生),并且如果被长期闲置不用,它将回到冷却状态。



参数:


  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
  • unit – 参数warmupPeriod 的时间单位



抛出:


  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0



限流及阻塞方法


acquire

image.png


public double acquire()
复制代码

从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求。如果存在等待的情况的话,告诉调用者获取到该请求所需要的睡眠时间。该方法等同于acquire(1)。



返回:

time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0



acquire
public double acquire(int permits)
复制代码


从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求数。如果存在等待的情况的话,告诉调用者获取到这些请求数所需要的睡眠时间。

参数:
  • permits – 需要获取的许可数
返回:
  • 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0




tryAcquire


public boolean tryAcquire(long timeout,TimeUnit unit)
复制代码

从RateLimiter获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)。该方法等同于tryAcquire(1, timeout, unit)。



参数:
  • timeout – 等待许可的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位



返回:
  • true表示获取到许可,反之则是false



抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0



tryAcquire


public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
复制代码

从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)。



参数:


  • permits – 需要获取的许可数
  • timeout – 等待许可数的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位



返回:


  • true表示获取到许可,反之则是false




限流及状态设置


image.png



public final void setRate(double permitsPerSecond)
复制代码


更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。调用该方法后,当前限制线程不会被唤醒,因此他们不会注意到最新的速率;只有接下来的请求才会。需要注意的是,由于每次请求偿还了(通过等待,如果需要的话)上一次请求的开销,这意味着紧紧跟着的下一个请求不会被最新的速率影响到,在调用了setRate 之后;它会偿还上一次请求的开销,这个开销依赖于之前的速率。RateLimiter的行为在任何方式下都不会被改变,比如如果 RateLimiter 有20秒的预热期配置,在此方法被调用后它还是会进行20秒的预热。



参数:
  • permitsPerSecond – RateLimiter的新的稳定速率


抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0


public final double getRate()
复制代码


返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数。它的初始值相当于构造这个RateLimiter的工厂方法中的参数permitsPerSecond ,并且只有在调用setRate(double)后才会被更新。





















相关文章
|
11天前
|
JSON 前端开发 JavaScript
java-ajax技术详解!!!
本文介绍了Ajax技术及其工作原理,包括其核心XMLHttpRequest对象的属性和方法。Ajax通过异步通信技术,实现在不重新加载整个页面的情况下更新部分网页内容。文章还详细描述了使用原生JavaScript实现Ajax的基本步骤,以及利用jQuery简化Ajax操作的方法。最后,介绍了JSON作为轻量级数据交换格式在Ajax应用中的使用,包括Java中JSON与对象的相互转换。
25 1
|
16天前
|
SQL Java 数据库连接
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率。本文介绍了连接池的工作原理、优势及实现方法,并提供了HikariCP的示例代码。
30 3
|
16天前
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
16 1
|
8天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
17天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
4天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
23 9
|
7天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
4天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
21 3
|
6天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。