使用递增计数器的线程同步工具 —— 信号量,它的原理是什么样子的?

简介: 在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。

网络异常,图片无法展示
|


前言


在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。


介绍


一个计数信号量。 从概念上讲,信号量维护了一组许可。 如果有必要,在许可可用之前调用 acquire 方法会被阻塞,直到许可证可用。 调用 release 方法会增加了一个许可证,从而释放被阻塞的线程。

  1. 声明时指定初始许可数量。
  2. 调用 acquire(int permits) 方法,指定目标许可数量。
  3. 调用 release(int permits) 方法,发布指定的许可数量。

在许可数量没有到达指定目标数量时,调用 acquire 方法的线程会被阻塞。


基本使用

public class SemaphoreTest1 {
    private static final Semaphore SEMAPHORE = new Semaphore(0);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 5; i++) {
            pool.submit(() -> {
                try {
                    Thread.sleep(1000 + new Random().nextInt(1000));
                } catch (InterruptedException ignored) {
                }
                System.out.println("当前线程: " + Thread.currentThread().getName() + " 发布一个许可");
                SEMAPHORE.release(1);
            });
        }
        System.out.println("-----> 这里是主线程");
        SEMAPHORE.acquire(5);
        System.out.println("-----> 主线程执行完毕");
        pool.shutdown();
    }
}
-----> 这里是主线程
当前线程: Thread-pool-2 发布一个许可
当前线程: Thread-pool-4 发布一个许可
当前线程: Thread-pool-1 发布一个许可
当前线程: Thread-pool-0 发布一个许可
当前线程: Thread-pool-3 发布一个许可
-----> 主线程执行完毕

上面这个方法也是模拟了类似 CountDownLatch 的用法, 在子线程执行完毕之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:


Semaphore 是从指定数值开始增加,直到到达许可数量,然后被阻塞线程开始继续执行。


CountDownLatch 是从指定数量的线程开始减少,直到为 0 时,被阻塞的线程开始继续执行。


当然这只是最简单的用法,除此让主线程等待,同样也可以让其他线程等待,然后再开始执行。


问题疑问

  1. Semaphore 和 AQS 有什么关系?
  2. Semaphore 和 CountDownLatch 有什么区别?


源码分析


基本结构

网络异常,图片无法展示
|

通过类图可以看出在 Semaphore 里面有一个静态内部类 Sync 继承了 AQS,同时为了区分公平和非公平的情况,Sync 分别有两个子类:NonfairSync 、FairSync。

下面根据案例分别从构造函数、acquire()、release() 入手,从而了解内部实现原理。


初始化

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


初始化默认非公平锁, 同时需要传入指定许可数, 可以看到这块代码是调用的 AQS 的 setState(permits) 方法。代码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
 }


setState 方法其实就是对 AQS 的 state 进行赋值。

补充

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程获得锁,大于等于 1 已经有线程获得锁,大于 1 说明该获得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
  3. 而在这里 (CountDownLatch)则代表门闩或者说计数的值。

如果对 state 有所遗忘,可以阅读前面的 AQS 、CAS 相关代码。 state 在这里代表的是信号量的许可数量。


acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 方法,只不过一个支持传递参数,一个默认为 1。

acquireSharedInterruptibly 方法,其实就是 Sync 继承自 AQS 的。


这块可以阅读 AQS 的文章,这里简单介绍下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 在失败后会使用 doAcquireSharedInterruptibly(arg); 不断获取资源;
  2. final Node node = addWaiter(Node.SHARED); 会创建节点以共享模式放到队列里;
  3. 在循环中不断判断前一个节点,如果是 head,则尝试获取共享资源;
  4. 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。


tryAcquireShared 是需要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做讲解:

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果前面有节点,则直接返回 -1 表示失败
            if (hasQueuedPredecessors())
                return -1;
            // 获取当前信号量
            int available = getState();
            // 获取当前剩余量
            int remaining = available - acquires;
            // 如果小于 0 或者 CAS 设置信号量成功 则直接返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

而这段代码的含义:

  1. 如果前面有节点,则直接阻塞;
  2. 如果当前剩余信号量小于 0 ,则返回负值,直接阻塞;
  3. 如果当前剩余量大于等于 0 ,会 CAS 更新信号量,并返回非负数。

这块数值的含义,在 AQS 中定义了,含义如下:

  1. 小于 0: 表示失败;
  2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
  3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。


release()

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}


发布许可证的给定数量,该数量增加可用的许可数量。 看其内部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


如果实现tryReleaseShared返回true,以共享模式释放资源。 其中的 tryReleaseShared 部分由 Semaphore.Sync 中实现,逻辑如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前 state
        int current = getState();
        // 对 state 进行增加
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 使用 CAS 赋值
        if (compareAndSetState(current, next))
            return true;
    }
}


通过上面代码可以看出,在 Semaphore 的 release 方法中主要就是对 state 进行增加,增加成功后会调用 AQS 的 doReleaseShared 方法唤醒头节点。


总结


Q&A


Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含义代表什么?

A: 在 Semaphore 中 state 代表许可数量,acquire 方法当许可小于指定数量会阻塞线程,release 方法增加许可当许可增加成功则唤醒阻塞节点。


Q: Semaphore 基于 AQS 具体是怎么实现的呢?

A:

  1. 初始设置 state 的初始值,即初始许可数量。
  2. acquire 方法设置目标数量,当目标数量大于当前数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
  3. release 对 state 进行增加,成功后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。


Q: Semaphore 和 CountDownLatch 有什么区别?

A: Semaphore 的计数器是递加的,而 CountDownLatch 是递减的。相同点就是计数器都不可以重置。


结束语


在阅读 Semaphore 源码过程中,发现其主要功能都是基于 AQS 实现的,可以回顾阅读 AQS 的相关笔记。同样 Semaphore 也支持公平和非公平模式,这块就需要小伙伴自己去阅读啦。

目录
相关文章
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
1天前
|
存储 安全 UED
多线程在打包工具中的运用
【11月更文挑战第2天】本文介绍了多线程技术在打包工具中的应用,包括提高打包效率、优化用户体验和多线程安全考虑。通过并行处理文件和加速资源收集,多线程可以显著缩短打包时间。在用户体验方面,多线程使界面保持响应,并支持优先级处理。此外,文章还讨论了资源访问冲突和死锁预防的解决方案,确保多线程环境下的稳定性和安全性。
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
115 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
27天前
|
运维 API 计算机视觉
深度解密协程锁、信号量以及线程锁的实现原理
深度解密协程锁、信号量以及线程锁的实现原理
30 1
|
25天前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
42 0
|
25天前
|
Java 应用服务中间件 API
nginx线程池原理
nginx线程池原理
24 0
|
2月前
|
Java 数据中心 微服务
Java高级知识:线程池隔离与信号量隔离的实战应用
在Java并发编程中,线程池隔离与信号量隔离是两种常用的资源隔离技术,它们在提高系统稳定性、防止系统过载方面发挥着重要作用。
23 0
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
3月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
3月前
|
安全 C++
利用信号量实现线程顺序执行
【8月更文挑战第25天】信号量是多线程编程中用于控制共享资源访问的关键同步机制,能有效保证线程按预设顺序执行。实现方法包括:引入相关头文件(如 C++ 中的 `&lt;semaphore.h&gt;`),创建信号量并通过 `sem_init` 设置初始值;在各线程函数中运用 `sem_post` 与 `sem_wait` 来传递执行权;最后,通过 `sem_destroy` 销毁信号量以释放资源。使用过程中需注意错误处理、确保线程安全及合理设定信号量初值,以维持程序稳定性和高效性。

相关实验场景

更多