使用递增计数器的线程同步工具 —— 信号量,它的原理是什么样子的?

简介: 在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。

网络异常,图片无法展示
|


前言


在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。


介绍


一个计数信号量。 从概念上讲,信号量维护了一组许可。 如果有必要,在许可可用之前调用 acquire 方法会被阻塞,直到许可证可用。 调用 release 方法会增加了一个许可证,从而释放被阻塞的线程。

  1. 声明时指定初始许可数量。
  2. 调用 acquire(int permits) 方法,指定目标许可数量。
  3. 调用 release(int permits) 方法,发布指定的许可数量。

在许可数量没有到达指定目标数量时,调用 acquire 方法的线程会被阻塞。


基本使用

public class SemaphoreTest1 {
    private static final Semaphore SEMAPHORE = new Semaphore(0);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 5; i++) {
            pool.submit(() -> {
                try {
                    Thread.sleep(1000 + new Random().nextInt(1000));
                } catch (InterruptedException ignored) {
                }
                System.out.println("当前线程: " + Thread.currentThread().getName() + " 发布一个许可");
                SEMAPHORE.release(1);
            });
        }
        System.out.println("-----> 这里是主线程");
        SEMAPHORE.acquire(5);
        System.out.println("-----> 主线程执行完毕");
        pool.shutdown();
    }
}
-----> 这里是主线程
当前线程: Thread-pool-2 发布一个许可
当前线程: Thread-pool-4 发布一个许可
当前线程: Thread-pool-1 发布一个许可
当前线程: Thread-pool-0 发布一个许可
当前线程: Thread-pool-3 发布一个许可
-----> 主线程执行完毕

上面这个方法也是模拟了类似 CountDownLatch 的用法, 在子线程执行完毕之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:


Semaphore 是从指定数值开始增加,直到到达许可数量,然后被阻塞线程开始继续执行。


CountDownLatch 是从指定数量的线程开始减少,直到为 0 时,被阻塞的线程开始继续执行。


当然这只是最简单的用法,除此让主线程等待,同样也可以让其他线程等待,然后再开始执行。


问题疑问

  1. Semaphore 和 AQS 有什么关系?
  2. Semaphore 和 CountDownLatch 有什么区别?


源码分析


基本结构

网络异常,图片无法展示
|

通过类图可以看出在 Semaphore 里面有一个静态内部类 Sync 继承了 AQS,同时为了区分公平和非公平的情况,Sync 分别有两个子类:NonfairSync 、FairSync。

下面根据案例分别从构造函数、acquire()、release() 入手,从而了解内部实现原理。


初始化

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


初始化默认非公平锁, 同时需要传入指定许可数, 可以看到这块代码是调用的 AQS 的 setState(permits) 方法。代码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
 }


setState 方法其实就是对 AQS 的 state 进行赋值。

补充

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程获得锁,大于等于 1 已经有线程获得锁,大于 1 说明该获得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
  3. 而在这里 (CountDownLatch)则代表门闩或者说计数的值。

如果对 state 有所遗忘,可以阅读前面的 AQS 、CAS 相关代码。 state 在这里代表的是信号量的许可数量。


acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 方法,只不过一个支持传递参数,一个默认为 1。

acquireSharedInterruptibly 方法,其实就是 Sync 继承自 AQS 的。


这块可以阅读 AQS 的文章,这里简单介绍下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 在失败后会使用 doAcquireSharedInterruptibly(arg); 不断获取资源;
  2. final Node node = addWaiter(Node.SHARED); 会创建节点以共享模式放到队列里;
  3. 在循环中不断判断前一个节点,如果是 head,则尝试获取共享资源;
  4. 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。


tryAcquireShared 是需要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做讲解:

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果前面有节点,则直接返回 -1 表示失败
            if (hasQueuedPredecessors())
                return -1;
            // 获取当前信号量
            int available = getState();
            // 获取当前剩余量
            int remaining = available - acquires;
            // 如果小于 0 或者 CAS 设置信号量成功 则直接返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

而这段代码的含义:

  1. 如果前面有节点,则直接阻塞;
  2. 如果当前剩余信号量小于 0 ,则返回负值,直接阻塞;
  3. 如果当前剩余量大于等于 0 ,会 CAS 更新信号量,并返回非负数。

这块数值的含义,在 AQS 中定义了,含义如下:

  1. 小于 0: 表示失败;
  2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
  3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。


release()

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}


发布许可证的给定数量,该数量增加可用的许可数量。 看其内部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


如果实现tryReleaseShared返回true,以共享模式释放资源。 其中的 tryReleaseShared 部分由 Semaphore.Sync 中实现,逻辑如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前 state
        int current = getState();
        // 对 state 进行增加
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 使用 CAS 赋值
        if (compareAndSetState(current, next))
            return true;
    }
}


通过上面代码可以看出,在 Semaphore 的 release 方法中主要就是对 state 进行增加,增加成功后会调用 AQS 的 doReleaseShared 方法唤醒头节点。


总结


Q&A


Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含义代表什么?

A: 在 Semaphore 中 state 代表许可数量,acquire 方法当许可小于指定数量会阻塞线程,release 方法增加许可当许可增加成功则唤醒阻塞节点。


Q: Semaphore 基于 AQS 具体是怎么实现的呢?

A:

  1. 初始设置 state 的初始值,即初始许可数量。
  2. acquire 方法设置目标数量,当目标数量大于当前数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
  3. release 对 state 进行增加,成功后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。


Q: Semaphore 和 CountDownLatch 有什么区别?

A: Semaphore 的计数器是递加的,而 CountDownLatch 是递减的。相同点就是计数器都不可以重置。


结束语


在阅读 Semaphore 源码过程中,发现其主要功能都是基于 AQS 实现的,可以回顾阅读 AQS 的相关笔记。同样 Semaphore 也支持公平和非公平模式,这块就需要小伙伴自己去阅读啦。

目录
相关文章
|
14天前
|
存储 Java C++
Java虚拟机(JVM)管理内存划分为多个区域:程序计数器记录线程执行位置;虚拟机栈存储线程私有数据
Java虚拟机(JVM)管理内存划分为多个区域:程序计数器记录线程执行位置;虚拟机栈存储线程私有数据,如局部变量和操作数;本地方法栈支持native方法;堆存放所有线程的对象实例,由垃圾回收管理;方法区(在Java 8后变为元空间)存储类信息和常量;运行时常量池是方法区一部分,保存符号引用和常量;直接内存非JVM规范定义,手动管理,通过Buffer类使用。Java 8后,永久代被元空间取代,G1成为默认GC。
23 2
|
3天前
|
存储 监控 Java
理解线程池的原理与最佳实践
理解线程池的原理与最佳实践
|
5天前
|
存储 监控 Java
理解线程池的原理与最佳实践
理解线程池的原理与最佳实践
|
11天前
|
存储 设计模式 并行计算
CopyOnWriteArrayList:深入理解Java中的线程安全List原理和应用
CopyOnWriteArrayList:深入理解Java中的线程安全List原理和应用
|
11天前
|
缓存 监控 Java
深入Elasticsearch:线程池的原理与应用
深入Elasticsearch:线程池的原理与应用
|
11天前
|
Java
线程池ThreadPoolExcutor源码剖析---工作原理
线程池ThreadPoolExcutor源码剖析---工作原理
|
2月前
|
监控 Java 调度
Java并发编程:线程池的原理与实践
【5月更文挑战第30天】 在现代软件开发中,尤其是Java应用中,并发编程是一个不可忽视的领域。线程池作为提升应用性能和资源利用率的关键技术之一,其正确使用和优化对系统稳定性和效率至关重要。本文将深入探讨线程池的核心原理、常见类型以及在实际开发中的使用案例,旨在帮助开发者更好地理解和运用线程池技术,构建高性能的Java应用程序。
|
2月前
|
安全 Java 编译器
Java 多线程系列Ⅴ(常见锁策略+CAS+synchronized原理)
Java 多线程系列Ⅴ(常见锁策略+CAS+synchronized原理)
|
24天前
|
缓存 算法 Java
深入解析线程上下文切换的原理与优化策略
深入解析线程上下文切换的原理与优化策略
35 0
|
2月前
|
监控 Java 开发者
深入理解Java并发编程:线程池的工作原理与实践
【5月更文挑战第29天】 在现代Java应用开发中,高效地管理并发任务是至关重要的。本文将深入探讨Java线程池的核心机制,揭示其背后的设计哲学和运作模式。通过分析线程池的优势、工作过程及关键参数,结合实例演示如何合理配置和优化线程池以提高应用程序的性能和响应能力。