Java并发编程笔记之Semaphore信号量源码分析

简介: JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?   Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。

JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?

  Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。为了能够一览Semaphore的内部结构,我们首先要看一下Semaphore的类图,类图,如下所示:

5f71589919e7c24d3357bc8f5f14636690cbb5a0

 

 如上类图可以知道Semaphoren内部还是使用AQS来实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,分别代表获取信号量的时候是否采取公平策略。创建Semaphore的时候会有一个变量标示是否使用公平策略,源码如下:


 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new       
        NonfairSync(permits);
    }

   Sync(int permits) {
       setState(permits);
   }


如上面代码所示,Semaphore默认使用的是非公平策略,如果你需要公平策略,则可以使用带两个参数的构造函数来构造Semaphore对象,另外和CountDownLatch一样,构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量,也就是说这里AQS的state值表示当前持有的信号量个数。

 

接下来我们主要看看Semaphore实现的主要方法的源码,如下:

  1.void acquire() 当前线程调用该方法的时候,目的是希望获取一个信号量资源,如果当前信号量计数个数大于 0 ,并且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减少 1 。否则会被放入AQS的阻塞队列,当前线程被挂起,直到其他线程调用了release方法释放了信号量,并且当前线程通过竞争获取到了改信号量。当前线程被其他线程调用了 interrupte()方法中断后,当前线程会抛出 InterruptedException异常返回。源码如下:


public void acquire() throws InterruptedException {
        //传递参数为1,说明要获取1个信号量资源
        sync.acquireSharedInterruptibly(1);
   }
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {

        //(1)如果线程被中断,则抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();

        //(2)否者调用sync子类方法尝试获取,这里根据构造函数确定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //如果获取失败则放入阻塞队列,然后再次尝试如果失败则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
    }


如上代码可知,acquire()内部调用了sync的acquireSharedInterruptibly  方法,后者是对中断响应的(如果当前线程被中断,则抛出中断异常),尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现,所以这里就要分公平性了,这里先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法,源码如下:


protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);

}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
     //获取当前信号量值
     int available = getState();
     //计算当前剩余值
     int remaining = available - acquires;
     //如果当前剩余小于0或者CAS设置成功则返回
     if (remaining < 0 ||
         compareAndSetState(available, remaining))
         return remaining;
    }
}


如上代码,先计算当前信号量值(available)减去需要获取的值(acquires) 得到剩余的信号量个数(remaining),如果剩余值小于 0 说明当前信号量个数满足不了需求,则直接返回负数,然后当前线程会被放入AQS的阻塞队列,当前线程被挂起。如果剩余值大于 0 则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性获取的,是说先调用aquire方法获取信号量的线程不一定比后来者先获取锁。

 

接下来我们要看看公平性的FairSync 类是如何保证公平性的,源码如下:


protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
 }


可以知道公平性还是靠 hasQueuedPredecessors 这个方法来做的,以前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源,如果是则自己放弃获取的权力,然后当前线程会被放入AQS阻塞队列,否则就去获取。hasQueuedPredecessors源码如下:


public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}


如上面代码所示,如果当前线程节点有前驱节点则返回true,否则如果当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false ,其中,如果 h == t 则说明当前队列为空则直接返回 false,如果 h !=t 并且 s == null 说明有一个元素将要作为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操作,首先创建一个哨兵头节点,然后第一个元素插入到哨兵节点后面),那么返回 true,如果  h !=t 并且 s != null 并且  s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。

 

  2.void acquire(int permits) 该方法与 acquire() 不同在与后者只需要获取一个信号量值,而前者则获取指定 permits 个,源码如下:


public void acquire(int permits) throws InterruptedException {
    if (permits < 0) 
      throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


 

  3.void acquireUninterruptibly() 该方法与 acquire() 类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源过程中(包含被阻塞后)其它线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码如下:


public void acquireUninterruptibly() {
     sync.acquireShared(1);
}


 

  4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不同在于该方法对中断不响应。源码如如下:


 public void acquireUninterruptibly(int permits) {
      if (permits < 0) 
           throw new IllegalArgumentException();
      sync.acquireShared(permits);
 }


 

  5.void release() 该方法作用是把当前 semaphore对象的信号量值增加 1 ,如果当前有线程因为调用 acquire 方法被阻塞放入了 AQS的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增加的信号量,源码如下:


public void release() {
        //(1)arg=1
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {

        //(2)尝试释放资源
        if (tryReleaseShared(arg)) {

            //(3)资源释放成功则调用park唤醒AQS队列里面最先挂起的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {

            //(4)获取当前信号量值
            int current = getState();

            //(5)当前信号量值增加releases,这里为增加1
            int next = current + releases;
            if (next < current) // 移除处理
                throw new Error("Maximum permit count exceeded");

            //(6)使用cas保证更新信号量值的原子性
            if (compareAndSetState(current, next))
                return true;
        }
    }


如上面代码可以看到 release()方法中对 sync.releaseShared(1),可以知道release方法每次只会对信号量值增加 1 ,tryReleaseShared方法是无限循环,使用CAS保证了 release 方法对信号量递增 1 的原子性操作。当tryReleaseShared 方法增加信号量成功后会执行代码(3),调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。

 

  6.void release(int permits) 该方法与不带参数的不同之处在于前者每次调用会在信号量值原来基础上增加 permits,而后者每次增加 1。源码如下:


public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}


另外注意到这里调用的是 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会阻塞。

 

到目前已经知道了其原理,接下来用一个例子来加深对Semaphore的理解,例子如下:


package com.hjc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by cong on 2018/7/8.
 */
public class SemaphoreTest {

    // 创建一个Semaphore实例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待子线程执行完毕,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        //关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

类似于 CountDownLatch,上面我们的例子也是在主线程中开启两个子线程进行执行,等所有子线程执行完毕后主线程在继续向下运行。

如上代码首先首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器为 0,然后 main 函数添加两个线程任务到线程池,每个线程内部调用了信号量的 release 方法,相当于计数值递增一,最后在 main 线程里面调用信号量的 acquire 方法,参数传递为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 时才会返回。

看到这里也就明白了,如果构造 Semaphore 时候传递的参数为 N,在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 M+N;

 

对CountDownLatch,CyclicBarrier,Semaphored这三者之间的比较总结:

  1.CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器为 0,而不管当前线程是否结束调用 await 的线程就可以往下执行,相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。

  2.CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者当计数器变为 0 后,就不能在被复用,而前者则使用 reset 方法可以重置后复用,前者对同一个算法但是输入参数不同的类似场景下比较适用。

  3.而 semaphore 采用了信号量递增的策略,一开始并不需要关心需要同步的线程个数,等调用 aquire 时候在指定需要同步个数,并且提供了获取信号量的公平性策略。

目录
相关文章
|
2天前
|
安全 Java 程序员
🚀JAVA异常处理“三部曲”:try-catch-finally带你征服编程的星辰大海!
【6月更文挑战第18天】Java异常处理的`try-catch-finally`是编程探险中的导航系统,确保程序在异常时安全航行。`try`捕获异常,`catch`处理异常,`finally`保证关键清理代码执行。通过实例展示了如何在文件读取中应用这一机制,即使遇到错误也能优雅退出,它是Java程序员征服技术高峰的关键工具。
|
1天前
|
存储 Java 测试技术
滚雪球学Java(66):Java之HashMap详解:深入剖析其底层实现与源码分析
【6月更文挑战第20天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
10 3
滚雪球学Java(66):Java之HashMap详解:深入剖析其底层实现与源码分析
|
22小时前
|
存储 安全 算法
Java并发编程中的线程安全性与性能优化
在Java编程中,特别是涉及并发操作时,线程安全性及其与性能优化是至关重要的问题。本文将深入探讨Java中线程安全的概念及其实现方式,以及如何通过性能优化策略提升程序的并发执行效率。
7 1
|
2天前
|
前端开发 Java 开发者
【编程达人必备】Java高手的秘籍:throw关键字,让异常处理游刃有余!
【6月更文挑战第19天】在Java编程中,熟练运用`throw`关键字是异常处理的关键。通过`throw`,我们可以优雅地处理如商品不存在或价格不匹配等异常情况,避免程序失控。例如,在订单计算中,当遇到问题时,可抛出自定义异常如`PriceMismatchException`。`throw`不仅用于抛出标准异常,还可创建业务相关的异常类型。此外,它允许异常从深层代码传递到上层处理,如在`OrderController`中捕获`calculateTotalAmount`的异常,包装后重新抛出,提供更详细的错误信息。掌握`throw`,能增强程序健壮性,使异常处理变得得心应手。
|
3天前
|
Java 程序员
Java多线程编程是指在一个进程中创建并运行多个线程,每个线程执行不同的任务,并行地工作,以达到提高效率的目的
【6月更文挑战第18天】Java多线程提升效率,通过synchronized关键字、Lock接口和原子变量实现同步互斥。synchronized控制共享资源访问,基于对象内置锁。Lock接口提供更灵活的锁管理,需手动解锁。原子变量类(如AtomicInteger)支持无锁的原子操作,减少性能影响。
16 3
|
1天前
|
安全 Java 调度
Java并发编程:优化多线程应用的性能与安全性
在当今软件开发中,多线程编程已成为不可或缺的一部分,尤其在Java应用程序中更是如此。本文探讨了Java中多线程编程的关键挑战和解决方案,重点介绍了如何通过合理的并发控制和优化策略来提升应用程序的性能和安全性,以及避免常见的并发问题。
10 1
|
1天前
|
Java
【编程侦探社】追踪 Java 线程:一场关于生命周期的侦探故事!
【6月更文挑战第19天】在Java世界中,线程如同神秘角色,编程侦探揭示其生命周期:从新生(`new Thread()`)到就绪(`start()`),面临并发挑战如资源共享冲突。通过`synchronized`实现同步,处理阻塞状态(如等待锁`synchronized (lock) {...}`),最终至死亡,侦探深入理解并解决了多线程谜题,成为编程侦探社的传奇案例。
|
1天前
|
Java
【编程炼金术】Java 线程:从一粒沙到一个世界,生命周期的奇妙转化!
【6月更文挑战第19天】Java线程生命周期始于`Thread`类或`Runnable`接口,经历创建、新生、就绪、运行、阻塞到死亡五态。调用`start()`使线程进入就绪,随后可能获得CPU执行权变为运行态。当阻塞后,线程返回就绪,等待再次执行。理解并管理线程生命周期是优化多线程程序的关键。
|
1天前
|
SQL Java 关系型数据库
Java数据库编程的详细介绍
Java数据库编程的详细介绍
7 1
|
3天前
|
安全 Java 程序员
Java并发编程中的锁机制与优化策略
【6月更文挑战第17天】在Java并发编程的世界中,锁是维护数据一致性和线程安全的关键。本文将深入探讨Java中的锁机制,包括内置锁、显式锁以及读写锁的原理和使用场景。我们将通过实际案例分析锁的优化策略,如减少锁粒度、使用并发容器以及避免死锁的技巧,旨在帮助开发者提升多线程程序的性能和可靠性。