Semaphore:实现一个限流器

简介: Semaphore:实现一个限流器


Semaphore 现在普遍翻译成 “信号量”,从概念上讲信号量维护着一组 “凭证”,获取到凭证的线程才能访问资源,使用完成后释放, 我们可以使用信号量来限制访问特定资源的并发线程数。

就像现实生活中的停车场车位,当有空位的时候才能放车子进入,不然就只能等待,出来的车子则释放凭证。

信号量模型

可以简单的概括为:一个计数器、一个等待队列、三个方法。 在信号量模型里,计数器和等待队列对外是透明的,只能通过信号量模型提供的三个方法访问它们,init()、acquire()、release()

  • init(): 设置计数器的初始值,初始化凭证数量。可以理解为停车场的车位数量。
  • acquire():计数器的值减 1 ;如果此时计数器的值小于 0,则当前线程将被阻塞,放到等待队列之中,否则当前线程可以继续执行。
  • release():计数器值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

这里提到的 init()、acquire()、release() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

通过一个简化版的信号模型代码便于理解:

public class Semaphore {
    //计数器
    private int count;
    //保存线程的等待队列
    private Queue queue;
    /**
     * 初始化计数器
     * @param count
     */
    public Semaphore(int count) {
        this.count = count;
    }
    /**
     * 获取凭证
     */
    public void acquire(){
        this.count--;
        if(this.count<0){
            // 将当前线程插入等待队列
            // 阻塞当前线程
        }
    }
    /**
     * 释放凭证
     */
    public void release(){
        this.count++;
        if(this.count >= 0) {
            // 移除等待队列中的某个线程 T
            // 唤醒线程 T
        }
    }
}

信号量的使用

通过上文我们了解到信号量模型原理,接下来则看如何在实际场景中使用。这里我们还是用累加器的例子来说明信号量的使用吧。在累加器的例子里面,count++ 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。

public class TestSemaPhore {
    private static int count;
    //初始化信号量为 1
    private static final Semaphore semaphore = new Semaphore(1);
    public static void addOne() throws InterruptedException {
        //使用信号量保证互斥,只有一个线程进入
        semaphore.acquire();
        try {
            count++;
        } finally {
            semaphore.release();
        }
    }
    public static int getCount() {
        return count;
    }
    public static void main(String[] args) throws InterruptedException {
        //模拟十个线程同时访问
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    addOne();
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        countDownLatch.countDown();
        TimeUnit.SECONDS.sleep(3);
        int count = getCount();
        System.out.println(count);
    }
}

我们来分析下信号量如何保证互斥的。

假设两个线程 T1 和 T2 同时访问 addOne(),当他们都调用semaphore.acquire();的时候,由于这是一个原子操作,所以只有一个线程能把信号量计数器减为 0,另外一个线程 T2 则是将计数器减为 -1。对应线程 T1 计数器的值为 0 ,满足大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量计数器的值为 -1,小于 0 ,按照我们之前的信号量模型 acquire()描述,线程 T2 将被阻塞进入等待队列。所以此刻只有线程 T1 进入临界区执行 count++

当前信号量计数器的值为 -1 ,当线程 T1 执行 semaphore.release()操作执行完后 计数器 +1 则变成了 0,满足小于等于 0,按照模型的定义,此刻等待队列中的 T2 将会被唤醒,于是 T2 在 T1 执行完临界区代码后才获得进入代码领截取的机会,从而保证了互斥性。

实现一个限流器

上面的例子我们利用信号量实现了一个简单的互斥锁,你会不会觉得奇怪,既然 Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实 Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区

常见的就是池化资源,比如连接池、对象池、线程池等。比如熟悉的数据库连接池,在同一时刻允许多个线程同时使用连接,当然在每个连接被释放之前,是允许其他线程使用的。

现在我们假设有一个场景,对象池需求,就是一次性创建 N 哥对象,之后所有的线程都复用这 N 个对象,在对象被释放前,是不允许其他线程使用。

/**
 * 对象池
 *
 */
public class ObjectPool {
    //使用 阻塞队列保存对象池
    private final ArrayBlockingQueue<InputSaleMapDO> pool;
    //信号量
    private final Semaphore sem;
    /**
     * 初始化对象池
     *
     * @param size 池大小
     */
    public ObjectPool(int size) {
        pool = new ArrayBlockingQueue<>(size);
        sem = new Semaphore(size);
        for (int i = 0; i < size; i++) {
            InputSaleMapDO inputSaleMapDO = new InputSaleMapDO();
            inputSaleMapDO.setId((long) i);
            pool.add(inputSaleMapDO);
        }
    }
    //利用对象池的对象调用 function
    public Long run(Function<InputSaleMapDO, Long> function) throws InterruptedException {
        InputSaleMapDO obj = null;
        sem.acquire();
        try {
            obj = pool.poll();
            return function.apply(obj);
        } finally {
            pool.add(obj);
            sem.release();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ObjectPool objectPool = new ObjectPool(2);
        //模拟十个线程同时访问
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    objectPool.run(f -> {
                        System.out.println(f);
                        return f.getId();
                    });
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        countDownLatch.countDown();
        TimeUnit.SECONDS.sleep(30);
    }
}

初始化线程池大小 2 ,我们模拟 10 个线程,每次只能两个线程分配对象 InputSaleMapDO。

执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(obj) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

思考

上面的例子中 保存对象池使用了 ArrayBlockingQueue ,是一个线程安全的容器,那么是否可以换成 ArrayList?欢迎后台给出答案。还有假设是停车场的车位作为对象池,车主停车是不是也可以使用 Semaphore 实现?


相关文章
|
搜索推荐 前端开发 架构师
阿里高级技术专家谈开源DDD框架:COLA4.0,分离架构和组件(上)
阿里高级技术专家谈开源DDD框架:COLA4.0,分离架构和组件(上)
2802 0
阿里高级技术专家谈开源DDD框架:COLA4.0,分离架构和组件(上)
|
传感器 数据采集 算法
python实现ModBusRTU客户端方式
python实现基于串口通信的ModBusRTU客户端是一件简单的事情,只要通过pymodbus模块就可以实现。
|
11月前
|
定位技术 Python
Matplotlib 教程 之 Matplotlib imshow() 方法 1
《Matplotlib imshow() 方法教程》:本文介绍 Matplotlib 库中的 imshow() 函数,该函数常用于绘制二维灰度或彩色图像,也可用于展示矩阵、热力图等。文中详细解释了其语法及参数,例如颜色映射(cmap)、归一化(norm)等,并通过实例演示了如何使用 imshow() 显示灰度图像。
257 2
|
分布式计算 大数据 API
|
JavaScript 前端开发 API
Vue 3的响应式系统相比Vue 2有哪些改进?
Vue 3 响应式系统升级亮点:使用 Proxy 替换 `Object.defineProperty`,实现更细粒度的变更跟踪与高性能;自动追踪嵌套属性,无需 `$set` 或深度监听;支持懒响应式,提升初始化性能;改进数组响应式,直接使用原生数组方法;递归侦听器增强灵活性;静态属性追踪;自定义响应式容器;统一 `ref` 和 `reactive` API;引入 `toRefs` 函数;优化响应式 API,如 `markRaw`, `shallowRef` 等,大幅提升效率和开发体验。
|
缓存 安全 Java
Java中函数式接口详解
Java 8引入函数式接口,支持函数式编程。这些接口有单一抽象方法,可与Lambda表达式结合,简化代码。常见函数式接口包括:`Function&lt;T, R&gt;`用于转换操作,`Predicate&lt;T&gt;`用于布尔判断,`Consumer&lt;T&gt;`用于消费输入,`Supplier&lt;T&gt;`用于无参生成结果。开发者也可自定义函数式接口。Lambda表达式使实现接口更简洁。注意异常处理和线程安全。函数式接口广泛应用于集合操作、并行编程和事件处理。提升代码可读性和效率,是现代Java开发的重要工具。
256 0
|
算法 Java Sentinel
限流算法(计数器、滑动时间窗口、漏斗、令牌)原理以及代码实现
> 本文会对这4个限流算法进行详细说明,并输出实现限流算法的代码示例。 > 代码是按照自己的理解写的,很简单的实现了功能,还请大佬们多多交流找bug。
1475 0
|
JSON 前端开发 Java
SpringBoot之JSON参数,路径参数的详细解析
SpringBoot之JSON参数,路径参数的详细解析
502 0
|
JSON JavaScript 应用服务中间件
关于The valid characters are defined in RFC 7230 and RFC 3986问题
建议从目前的角度出发使用第三种方式降低tomcat版本就可以了,如果从长远出发的话,建议遵循RFC 7230 and RFC 3986规范,对于非保留字字符(json格式的请求参数)做转义操作。
2292 0
关于The valid characters are defined in RFC 7230 and RFC 3986问题
|
前端开发 JavaScript 开发工具
实现一个 Code Pen:(三)10 行代码实现代码格式化
在上文中,我们使用 monaco-editor 结合 Next.js,打造了编辑器的功能,在本文中,我们将继续优化 monaco-editor, 使它拥有代码格式化的功能。
1102 0
实现一个 Code Pen:(三)10 行代码实现代码格式化