使用递增计数器的线程同步工具 —— 信号量,它的原理是什么样子的?

简介: 在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。

网络异常,图片无法展示
|


前言


在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。


介绍


一个计数信号量。 从概念上讲,信号量维护了一组许可。 如果有必要,在许可可用之前调用 acquire 方法会被阻塞,直到许可证可用。 调用 release 方法会增加了一个许可证,从而释放被阻塞的线程。

  1. 声明时指定初始许可数量。
  2. 调用 acquire(int permits) 方法,指定目标许可数量。
  3. 调用 release(int permits) 方法,发布指定的许可数量。

在许可数量没有到达指定目标数量时,调用 acquire 方法的线程会被阻塞。


基本使用

public class SemaphoreTest1 {
    private static final Semaphore SEMAPHORE = new Semaphore(0);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 5; i++) {
            pool.submit(() -> {
                try {
                    Thread.sleep(1000 + new Random().nextInt(1000));
                } catch (InterruptedException ignored) {
                }
                System.out.println("当前线程: " + Thread.currentThread().getName() + " 发布一个许可");
                SEMAPHORE.release(1);
            });
        }
        System.out.println("-----> 这里是主线程");
        SEMAPHORE.acquire(5);
        System.out.println("-----> 主线程执行完毕");
        pool.shutdown();
    }
}
-----> 这里是主线程
当前线程: Thread-pool-2 发布一个许可
当前线程: Thread-pool-4 发布一个许可
当前线程: Thread-pool-1 发布一个许可
当前线程: Thread-pool-0 发布一个许可
当前线程: Thread-pool-3 发布一个许可
-----> 主线程执行完毕

上面这个方法也是模拟了类似 CountDownLatch 的用法, 在子线程执行完毕之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:


Semaphore 是从指定数值开始增加,直到到达许可数量,然后被阻塞线程开始继续执行。


CountDownLatch 是从指定数量的线程开始减少,直到为 0 时,被阻塞的线程开始继续执行。


当然这只是最简单的用法,除此让主线程等待,同样也可以让其他线程等待,然后再开始执行。


问题疑问

  1. Semaphore 和 AQS 有什么关系?
  2. Semaphore 和 CountDownLatch 有什么区别?


源码分析


基本结构

网络异常,图片无法展示
|

通过类图可以看出在 Semaphore 里面有一个静态内部类 Sync 继承了 AQS,同时为了区分公平和非公平的情况,Sync 分别有两个子类:NonfairSync 、FairSync。

下面根据案例分别从构造函数、acquire()、release() 入手,从而了解内部实现原理。


初始化

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


初始化默认非公平锁, 同时需要传入指定许可数, 可以看到这块代码是调用的 AQS 的 setState(permits) 方法。代码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
}
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        Sync(int permits) {
            setState(permits);
        }
 }


setState 方法其实就是对 AQS 的 state 进行赋值。

补充

  1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程获得锁,大于等于 1 已经有线程获得锁,大于 1 说明该获得锁的线程多次重入。
  2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
  3. 而在这里 (CountDownLatch)则代表门闩或者说计数的值。

如果对 state 有所遗忘,可以阅读前面的 AQS 、CAS 相关代码。 state 在这里代表的是信号量的许可数量。


acquire()

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 方法,只不过一个支持传递参数,一个默认为 1。

acquireSharedInterruptibly 方法,其实就是 Sync 继承自 AQS 的。


这块可以阅读 AQS 的文章,这里简单介绍下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 在失败后会使用 doAcquireSharedInterruptibly(arg); 不断获取资源;
  2. final Node node = addWaiter(Node.SHARED); 会创建节点以共享模式放到队列里;
  3. 在循环中不断判断前一个节点,如果是 head,则尝试获取共享资源;
  4. 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。


tryAcquireShared 是需要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做讲解:

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 如果前面有节点,则直接返回 -1 表示失败
            if (hasQueuedPredecessors())
                return -1;
            // 获取当前信号量
            int available = getState();
            // 获取当前剩余量
            int remaining = available - acquires;
            // 如果小于 0 或者 CAS 设置信号量成功 则直接返回
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

而这段代码的含义:

  1. 如果前面有节点,则直接阻塞;
  2. 如果当前剩余信号量小于 0 ,则返回负值,直接阻塞;
  3. 如果当前剩余量大于等于 0 ,会 CAS 更新信号量,并返回非负数。

这块数值的含义,在 AQS 中定义了,含义如下:

  1. 小于 0: 表示失败;
  2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
  3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。


release()

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}


发布许可证的给定数量,该数量增加可用的许可数量。 看其内部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


如果实现tryReleaseShared返回true,以共享模式释放资源。 其中的 tryReleaseShared 部分由 Semaphore.Sync 中实现,逻辑如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前 state
        int current = getState();
        // 对 state 进行增加
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 使用 CAS 赋值
        if (compareAndSetState(current, next))
            return true;
    }
}


通过上面代码可以看出,在 Semaphore 的 release 方法中主要就是对 state 进行增加,增加成功后会调用 AQS 的 doReleaseShared 方法唤醒头节点。


总结


Q&A


Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含义代表什么?

A: 在 Semaphore 中 state 代表许可数量,acquire 方法当许可小于指定数量会阻塞线程,release 方法增加许可当许可增加成功则唤醒阻塞节点。


Q: Semaphore 基于 AQS 具体是怎么实现的呢?

A:

  1. 初始设置 state 的初始值,即初始许可数量。
  2. acquire 方法设置目标数量,当目标数量大于当前数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
  3. release 对 state 进行增加,成功后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。


Q: Semaphore 和 CountDownLatch 有什么区别?

A: Semaphore 的计数器是递加的,而 CountDownLatch 是递减的。相同点就是计数器都不可以重置。


结束语


在阅读 Semaphore 源码过程中,发现其主要功能都是基于 AQS 实现的,可以回顾阅读 AQS 的相关笔记。同样 Semaphore 也支持公平和非公平模式,这块就需要小伙伴自己去阅读啦。

目录
相关文章
|
2月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
184 1
|
2月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
154 2
|
7月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
255 0
|
4月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
280 1
|
6月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
260 0
|
10月前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
11月前
|
Java Linux 调度
硬核揭秘:线程与进程的底层原理,面试高分必备!
嘿,大家好!我是小米,29岁的技术爱好者。今天来聊聊线程和进程的区别。进程是操作系统中运行的程序实例,有独立内存空间;线程是进程内的最小执行单元,共享内存。创建进程开销大但更安全,线程轻量高效但易引发数据竞争。面试时可强调:进程是资源分配单位,线程是CPU调度单位。根据不同场景选择合适的并发模型,如高并发用线程池。希望这篇文章能帮你更好地理解并回答面试中的相关问题,祝你早日拿下心仪的offer!
247 6
|
安全 程序员 API
|
存储 安全 UED
多线程在打包工具中的运用
【11月更文挑战第2天】本文介绍了多线程技术在打包工具中的应用,包括提高打包效率、优化用户体验和多线程安全考虑。通过并行处理文件和加速资源收集,多线程可以显著缩短打包时间。在用户体验方面,多线程使界面保持响应,并支持优先级处理。此外,文章还讨论了资源访问冲突和死锁预防的解决方案,确保多线程环境下的稳定性和安全性。
152 1
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
164 0

热门文章

最新文章