Java并发编程笔记之Semaphore信号量源码分析

简介: JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?   Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。

JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同在于它内部的计数器是递增的,那么,Semaphore 的内部实现是怎样的呢?

  Semaphore 信号量也是Java 中一个同步容器,与CountDownLatch 和 CyclicBarrier 不同之处在于它内部的计数器是递增的。为了能够一览Semaphore的内部结构,我们首先要看一下Semaphore的类图,类图,如下所示:

5f71589919e7c24d3357bc8f5f14636690cbb5a0

 

 如上类图可以知道Semaphoren内部还是使用AQS来实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,分别代表获取信号量的时候是否采取公平策略。创建Semaphore的时候会有一个变量标示是否使用公平策略,源码如下:


 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new       
        NonfairSync(permits);
    }

   Sync(int permits) {
       setState(permits);
   }


如上面代码所示,Semaphore默认使用的是非公平策略,如果你需要公平策略,则可以使用带两个参数的构造函数来构造Semaphore对象,另外和CountDownLatch一样,构造函数里面传递的初始化信号量个数 permits 被赋值给了AQS 的state状态变量,也就是说这里AQS的state值表示当前持有的信号量个数。

 

接下来我们主要看看Semaphore实现的主要方法的源码,如下:

  1.void acquire() 当前线程调用该方法的时候,目的是希望获取一个信号量资源,如果当前信号量计数个数大于 0 ,并且当前线程获取到了一个信号量则该方法直接返回,当前信号量的计数会减少 1 。否则会被放入AQS的阻塞队列,当前线程被挂起,直到其他线程调用了release方法释放了信号量,并且当前线程通过竞争获取到了改信号量。当前线程被其他线程调用了 interrupte()方法中断后,当前线程会抛出 InterruptedException异常返回。源码如下:


public void acquire() throws InterruptedException {
        //传递参数为1,说明要获取1个信号量资源
        sync.acquireSharedInterruptibly(1);
   }
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {

        //(1)如果线程被中断,则抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();

        //(2)否者调用sync子类方法尝试获取,这里根据构造函数确定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //如果获取失败则放入阻塞队列,然后再次尝试如果失败则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
    }


如上代码可知,acquire()内部调用了sync的acquireSharedInterruptibly  方法,后者是对中断响应的(如果当前线程被中断,则抛出中断异常),尝试获取信号量资源的AQS的方法tryAcquireShared 是由 sync 的子类实现,所以这里就要分公平性了,这里先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法,源码如下:


protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);

}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
     //获取当前信号量值
     int available = getState();
     //计算当前剩余值
     int remaining = available - acquires;
     //如果当前剩余小于0或者CAS设置成功则返回
     if (remaining < 0 ||
         compareAndSetState(available, remaining))
         return remaining;
    }
}


如上代码,先计算当前信号量值(available)减去需要获取的值(acquires) 得到剩余的信号量个数(remaining),如果剩余值小于 0 说明当前信号量个数满足不了需求,则直接返回负数,然后当前线程会被放入AQS的阻塞队列,当前线程被挂起。如果剩余值大于 0 则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性获取的,是说先调用aquire方法获取信号量的线程不一定比后来者先获取锁。

 

接下来我们要看看公平性的FairSync 类是如何保证公平性的,源码如下:


protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || compareAndSetState(available, remaining))
                return remaining;
        }
 }


可以知道公平性还是靠 hasQueuedPredecessors 这个方法来做的,以前的随笔已经讲过公平性是看当前线程节点是否有前驱节点也在等待获取该资源,如果是则自己放弃获取的权力,然后当前线程会被放入AQS阻塞队列,否则就去获取。hasQueuedPredecessors源码如下:


public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}


如上面代码所示,如果当前线程节点有前驱节点则返回true,否则如果当前AQS队列为空 或者 当前线程节点是AQS的第一个节点则返回 false ,其中,如果 h == t 则说明当前队列为空则直接返回 false,如果 h !=t 并且 s == null 说明有一个元素将要作为AQS的第一个节点入队列(回顾下 enq 函数第一个元素入队列是两步操作,首先创建一个哨兵头节点,然后第一个元素插入到哨兵节点后面),那么返回 true,如果  h !=t 并且 s != null 并且  s.thread != Thread.currentThread() 则说明队列里面的第一个元素不是当前线程则返回 true。

 

  2.void acquire(int permits) 该方法与 acquire() 不同在与后者只需要获取一个信号量值,而前者则获取指定 permits 个,源码如下:


public void acquire(int permits) throws InterruptedException {
    if (permits < 0) 
      throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


 

  3.void acquireUninterruptibly() 该方法与 acquire() 类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源过程中(包含被阻塞后)其它线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志当前线程并不会抛出 InterruptedException 异常而返回。源码如下:


public void acquireUninterruptibly() {
     sync.acquireShared(1);
}


 

  4.void acquireUninterruptibly(int permits) 该方法与 acquire(int permits) 不同在于该方法对中断不响应。源码如如下:


 public void acquireUninterruptibly(int permits) {
      if (permits < 0) 
           throw new IllegalArgumentException();
      sync.acquireShared(permits);
 }


 

  5.void release() 该方法作用是把当前 semaphore对象的信号量值增加 1 ,如果当前有线程因为调用 acquire 方法被阻塞放入了 AQS的阻塞队列,则会根据公平策略选择一个线程进行激活,激活的线程会尝试获取刚增加的信号量,源码如下:


public void release() {
        //(1)arg=1
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {

        //(2)尝试释放资源
        if (tryReleaseShared(arg)) {

            //(3)资源释放成功则调用park唤醒AQS队列里面最先挂起的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {

            //(4)获取当前信号量值
            int current = getState();

            //(5)当前信号量值增加releases,这里为增加1
            int next = current + releases;
            if (next < current) // 移除处理
                throw new Error("Maximum permit count exceeded");

            //(6)使用cas保证更新信号量值的原子性
            if (compareAndSetState(current, next))
                return true;
        }
    }


如上面代码可以看到 release()方法中对 sync.releaseShared(1),可以知道release方法每次只会对信号量值增加 1 ,tryReleaseShared方法是无限循环,使用CAS保证了 release 方法对信号量递增 1 的原子性操作。当tryReleaseShared 方法增加信号量成功后会执行代码(3),调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。

 

  6.void release(int permits) 该方法与不带参数的不同之处在于前者每次调用会在信号量值原来基础上增加 permits,而后者每次增加 1。源码如下:


public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}


另外注意到这里调用的是 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会阻塞。

 

到目前已经知道了其原理,接下来用一个例子来加深对Semaphore的理解,例子如下:


package com.hjc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by cong on 2018/7/8.
 */
public class SemaphoreTest {

    // 创建一个Semaphore实例
    private static volatile Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " over");
                    semaphore.release();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 等待子线程执行完毕,返回
        semaphore.acquire(2);
        System.out.println("all child thread over!");

        //关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

类似于 CountDownLatch,上面我们的例子也是在主线程中开启两个子线程进行执行,等所有子线程执行完毕后主线程在继续向下运行。

如上代码首先首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器为 0,然后 main 函数添加两个线程任务到线程池,每个线程内部调用了信号量的 release 方法,相当于计数值递增一,最后在 main 线程里面调用信号量的 acquire 方法,参数传递为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 时才会返回。

看到这里也就明白了,如果构造 Semaphore 时候传递的参数为 N,在 M 个线程中调用了该信号量的 release 方法,那么在调用 acquire 对 M 个线程进行同步时候传递的参数应该是 M+N;

 

对CountDownLatch,CyclicBarrier,Semaphored这三者之间的比较总结:

  1.CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器为 0,而不管当前线程是否结束调用 await 的线程就可以往下执行,相比使用 jion 必须等待线程执行完毕后主线程才会继续向下运行更灵活。

  2.CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者当计数器变为 0 后,就不能在被复用,而前者则使用 reset 方法可以重置后复用,前者对同一个算法但是输入参数不同的类似场景下比较适用。

  3.而 semaphore 采用了信号量递增的策略,一开始并不需要关心需要同步的线程个数,等调用 aquire 时候在指定需要同步个数,并且提供了获取信号量的公平性策略。

目录
相关文章
|
4天前
|
IDE Java 物联网
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
11 0
|
4天前
|
安全 Java 开发者
Java并发编程:深入理解Synchronized关键字
【4月更文挑战第19天】 在Java多线程编程中,为了确保数据的一致性和线程安全,我们经常需要使用到同步机制。其中,`synchronized`关键字是最为常见的一种方式,它能够保证在同一时刻只有一个线程可以访问某个对象的特定代码段。本文将深入探讨`synchronized`关键字的原理、用法以及性能影响,并通过具体示例来展示如何在Java程序中有效地应用这一技术。
|
5天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
5天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
6天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
6天前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【4月更文挑战第17天】本文探讨了Java中的CompletableFuture和反应式编程在提升异步编程体验上的作用。CompletableFuture作为Java 8引入的Future扩展,提供了一套流畅的链式API,简化异步操作,如示例所示的非阻塞数据库查询。反应式编程则关注数据流和变化传播,通过Reactor等框架实现高度响应的异步处理。两者结合,如将CompletableFuture转换为Mono或Flux,可以兼顾灵活性和资源管理,适应现代高并发环境的需求。开发者可按需选择和整合这两种技术,优化系统性能和响应能力。
|
7天前
|
缓存 监控 Java
Java并发编程:线程池与任务调度
【4月更文挑战第16天】Java并发编程中,线程池和任务调度是核心概念,能提升系统性能和响应速度。线程池通过重用线程减少创建销毁开销,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。任务调度允许立即或延迟执行任务,具有灵活性。最佳实践包括合理配置线程池大小、避免过度使用线程、及时关闭线程池和处理异常。掌握这些能有效管理并发任务,避免性能瓶颈。
|
7天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
Java 数据库 容器
java中使用Semaphore构建阻塞对象池
java中使用Semaphore构建阻塞对象池
|
11天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。

热门文章

最新文章