Java并发编程笔记之Semaphore信号量源码分析

简介: JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?   Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。

JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?

  Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。为了能够一览Semaphore的内部结构,我们首先要看一下Semaphore的类图,类图,如下所示:


 

 如上类图可以知道Semaphoren内部还是使用AQS来实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,分别代表获取信号量的时候是否采取公平策略。创建Semaphore的时候会有一个变量标示是否使用公平策略,源码如下:


 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new       
        NonfairSync(permits);
    }

   Sync(int permits) {
       setState(permits);
   }


如上面代码所示,Semaphore默认使用的是非公平策略,如果你需要公平策略,则可以使用带两个参数的构造函数来构造Semaphore对象,另外和CountDownLatch一样,构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量,也就是说这里AQS的state值表示当前持有的信号量个数。

 

接下来我们主要看看Semaphore实现的主要方法的源码,如下:

  1.void acquire() 当前线程调用该方法的时候,目的是希望获取一个信号量资源,如果当前信号量计数个数大于 0 ,并且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减少 1 。否则会被放入AQS的阻塞队列,当前线程被挂起,直到其他线程调用了release方法释放了信号量,并且当前线程通过竞争获取到了改信号量。当前线程被其他线程调用了 interrupte()方法中断后,当前线程会抛出 InterruptedException异常返回。源码如下:


public void acquire() throws InterruptedException {
        //传递参数为1,说明要获取1个信号量资源
        sync.acquireSharedInterruptibly(1);
   }
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {

        //(1)如果线程被中断,则抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();

        //(2)否者调用sync子类方法尝试获取,这里根据构造函数确定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //如果获取失败则放入阻塞队列,然后再次尝试如果失败则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
    }


如上代码可知,acquire()内部调用了sync的acquireSharedInterruptibly  方法,后者是对中断响应的(如果当前线程被中断,则抛出中断异常),尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现,所以这里就要分公平性了,这里先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法,源码如下:


protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);

}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
     //获取当前信号量值
     int available = getState();
     //计算当前剩余值
     int remaining = available - acquires;
     //如果当前剩余小于0或者CAS设置成功则返回
     if (remaining < 0 ||
         compareAndSetState(available, remaining))
         return remaining;
    }
}


如上代码,先计算当前信号量值(available)减去需要获取的值(acquires) 得到剩余的信号量个数(remaining),如果剩余值小于 0 说明当前信号量个数满足不了需求,则直接返回负数,然后当前线程会被放入AQS的阻塞队列,当前线程被挂起。如果剩余值大于 0 则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性获取的,是说先调用aquire方法获取信号量的线程不一定比后来者先获取锁。

 

接下来我们要看看公平性的FairSync 类是如何保证公平性的,源码如下:


protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
 }


可以知道公平性还是靠 hasQueuedPredecessors 这个方法来做的,以前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源,如果是则自己放弃获取的权力,然后当前线程会被放入AQS阻塞队列,否则就去获取。hasQueuedPredecessors源码如下:


public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}


如上面代码所示,如果当前线程节点有前驱节点则返回true,否则如果当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false ,其中,如果 h == t 则说明当前队列为空则直接返回 false,如果 h !=t 并且 s == null 说明有一个元素将要作为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操作,首先创建一个哨兵头节点,然后第一个元素插入到哨兵节点后面),那么返回 true,如果  h !=t 并且 s != null 并且  s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。

 

  2.void acquire(int permits) 该方法与 acquire() 不同在与后者只需要获取一个信号量值,而前者则获取指定 permits 个,源码如下:


public void acquire(int permits) throws InterruptedException {
    if (permits < 0) 
      throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


 

  3.void acquireUninterruptibly() 该方法与 acquire() 类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源过程中(包含被阻塞后)其它线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码如下:


public void acquireUninterruptibly() {
     sync.acquireShared(1);
}


 

  4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不同在于该方法对中断不响应。源码如如下:


 public void acquireUninterruptibly(int permits) {
      if (permits < 0) 
           throw new IllegalArgumentException();
      sync.acquireShared(permits);
 }


 

  5.void release() 该方法作用是把当前 semaphore对象的信号量值增加 1 ,如果当前有线程因为调用 acquire 方法被阻塞放入了 AQS的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增加的信号量,源码如下:


public void release() {
        //(1)arg=1
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {

        //(2)尝试释放资源
        if (tryReleaseShared(arg)) {

            //(3)资源释放成功则调用park唤醒AQS队列里面最先挂起的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {

            //(4)获取当前信号量值
            int current = getState();

            //(5)当前信号量值增加releases,这里为增加1
            int next = current + releases;
            if (next < current) // 移除处理
                throw new Error("Maximum permit count exceeded");

            //(6)使用cas保证更新信号量值的原子性
            if (compareAndSetState(current, next))
                return true;
        }
    }


如上面代码可以看到 release()方法中对 sync.releaseShared(1),可以知道release方法每次只会对信号量值增加 1 ,tryReleaseShared方法是无限循环,使用CAS保证了 release 方法对信号量递增 1 的原子性操作。当tryReleaseShared 方法增加信号量成功后会执行代码(3),调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。

 

  6.void release(int permits) 该方法与不带参数的不同之处在于前者每次调用会在信号量值原来基础上增加 permits,而后者每次增加 1。源码如下:


public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}


另外注意到这里调用的是 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会阻塞。

 

到目前已经知道了其原理,接下来用一个例子来加深对Semaphore的理解,例子如下:


package com.hjc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by cong on 2018/7/8.
 */
public class SemaphoreTest {

    // 创建一个Semaphore实例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待子线程执行完毕,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        //关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

类似于 CountDownLatch,上面我们的例子也是在主线程中开启两个子线程进行执行,等所有子线程执行完毕后主线程在继续向下运行。

如上代码首先首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器为 0,然后 main 函数添加两个线程任务到线程池,每个线程内部调用了信号量的 release 方法,相当于计数值递增一,最后在 main 线程里面调用信号量的 acquire 方法,参数传递为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 时才会返回。

看到这里也就明白了,如果构造 Semaphore 时候传递的参数为 N,在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 M+N;

 

对CountDownLatch,CyclicBarrier,Semaphored这三者之间的比较总结:

  1.CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器为 0,而不管当前线程是否结束调用 await 的线程就可以往下执行,相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。

  2.CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者当计数器变为 0 后,就不能在被复用,而前者则使用 reset 方法可以重置后复用,前者对同一个算法但是输入参数不同的类似场景下比较适用。

  3.而 semaphore 采用了信号量递增的策略,一开始并不需要关心需要同步的线程个数,等调用 aquire 时候在指定需要同步个数,并且提供了获取信号量的公平性策略。

目录
相关文章
|
18天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
22天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
61 12
|
19天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
105 2
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
Java
java多线程之闭锁(CountDownLatch)、同步屏幕(CyclicBarrier)、信号量(Semaphore)
闭锁CountDownLatch 若有多条线程,其中一条线程需要等到其他所有线程准备完所需的资源后才能运行,这样的情况可以使用闭锁。 import java.util.concurrent.
897 0
|
Java API
java多线程--信号量Semaphore的使用
  Semaphore可以控制某个共享资源可被同时访问的次数,即可以维护当前访问某一共享资源的线程个数,并提供了同步机制.例如控制某一个文件允许的并发访问的数量.   例如网吧里有100台机器,那么最多只能提供100个人同时上网,当来了第101个客人的时候,就需要等着,一旦有一个人人下机,就可以立马得到了个空机位补上去.
1156 0
|
6天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
44 17
|
16天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者