Java Review - 并发编程_ 信号量Semaphore原理&源码剖析

简介: Java Review - 并发编程_ 信号量Semaphore原理&源码剖析

195d03d17afc4a928bc581f313b01dfe.png

概述


Semaphore信号量也是Java中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。


小Demo

同样下面的例子也是在主线程中开启两个子线程让它们执行,等所有子线程执行完毕后主线程再继续向下运行。

import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/14 23:59
 * @mark: show me the code , change the world
 */
public class SemphoreTest {
    // 1 创建Sempaphore实例  当前信号量计数器的值为0
    private static Semaphore semaphore = new Semaphore(0);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 线程1 提交到线程池
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
            // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
            semaphore.release();
        });
        // 线程2 提交到线程池
        executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " 执行结束 "  + LocalTime.now());
                // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 1 等待子线程执行任务完成后返回
        semaphore.acquire(2);
        System.out.println(Thread.currentThread().getName() + "任务执行结束 " + LocalTime.now()) ;
        // 关闭线程池
        executorService.shutdown();
    }
}


6830644ed3554d99b372a74ac868acd5.png


首先创建了一个信号量实例,构造函数的入参为0,说明当前信号量计数器的值为0


然后main函数向线程池添加两个线程任务,在每个线程内部调用信号量的release方法,这相当于让计数器值递增1


最后在main线程里面调用信号量的acquire方法,传参为2说明调用acquire方法的线程会一直阻塞,直到信号量的计数变为2才会返回


看到这里也就明白了,如果构造Semaphore时传递的参数为N,并在M个线程中调用了该信号量的release方法,那么在调用acquire使M个线程同步时传递的参数应该是M+N。


下面举个例子来模拟【CyclicBarrier复用】的功能,代码如下


import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/12/14 23:59
 * @mark: show me the code , change the world
 */
public class SemphoreTest2 {
    // 1 创建Sempaphore实例
    private static Semaphore semaphore = new Semaphore(0);
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 线程1 提交到线程池
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
            // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
            semaphore.release();
        });
        // 线程2 提交到线程池
        executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " 执行结束 "  + LocalTime.now());
                // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 1 等待子线程执行任务完成后返回
        semaphore.acquire(2);
        // 线程3 提交到线程池
        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " 执行结束 " + LocalTime.now());
            // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
            semaphore.release();
        });
        // 线程4 提交到线程池
        executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + " 执行结束 "  + LocalTime.now());
                // 在每个线程内部调用信号量的release方法,这相当于让计数器值递增1
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 2等待子线程执行任务完成后返回
        semaphore.acquire(2);
        System.out.println(Thread.currentThread().getName() + "任务执行结束 " + LocalTime.now()) ;
        // 关闭线程池
        executorService.shutdown();
    }
}


首先将线程1和线程2加入到线程池。主线程执行代码(1)后被阻塞。线程1和线程2调用release方法后信号量的值变为了2,这时候主线程的aquire方法会在获取到2个信号量后返回(返回后当前信号量值为0)。


然后主线程添加线程3和线程4到线程池,之后主线程执行代码(2)后被阻塞(因为主线程要获取2个信号量,而当前信号量个数为0)。当线程3和线程4执行完release方法后,主线程才返回。


从本例子可以看出,Semaphore在某种程度上实现了CyclicBarrier的复用功能。


类关系概述



a487b27f65dd423ea62052d8dc59d651.png



由该类图可知,Semaphore还是使用AQS实现的。Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。


例如,下面的代码在创建Semaphore时会使用一个变量指定是否使用公平策略。


   public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
  Sync(int permits) {
            setState(permits);
        }


Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。


另外,如CountDownLatch构造函数传递的初始化信号量个数permits被赋给了AQS的state状态变量一样,这里AQS的state值也表示当前持有的信号量个数。


核心方法源码解读

void acquire()

  public void acquire() throws InterruptedException {
      // 传递参数为1 ,说明要获取一个信号量资源
        sync.acquireSharedInterruptibly(1);
    }
  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 1 . 如果线程被中断,抛出被中断异常   
        if (Thread.interrupted())
            throw new InterruptedException();
        // 2  否则调用Syn子类方法尝试重新获取  
        if (tryAcquireShared(arg) < 0)
          // 如果获取失败,则放入阻塞队列,然后再次尝试,如果失败则调用park方法挂起当前线程
            doAcquireSharedInterruptibly(arg);
    }


acquire()在内部调用了Sync的acquireSharedInterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。

尝试获取信号量资源的AQS的方法tryAcquireShared是由Sync的子类实现的,所以这里分别从两方面来讨论。


非公平策略NonfairSync类的tryAcquireShared方法

be2296080d5442eb86370fdd5aa62dd8.png

继续看下 nonfairTryAcquireShared


44601c8de23942caba928ab22a671fcf.png

     final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
              // 获取当前信号量的值
                int available = getState();
                // 计算当前剩余值
                int remaining = available - acquires;
                // 如果当前值<0 或者 CAS设置成功则返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining)


如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。


如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。


另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。


举个例子:


线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列

过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量

但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源。


如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。


公平策略FairSync类的tryAcquireShared方法


43ed15650c99463a9adc77857c761e01.png

   /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        FairSync(int permits) {
            super(permits);
        }
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }


可见公平性还是靠hasQueuedPredecessors这个函数来保证的。前几篇博文里重点介绍了hasQueuedPredecessors。 公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。


void acquire(int permits)


该方法与acquire()方法不同,后者只需要获取一个信号量值,而前者则获取permits个。


  public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }


void acquireUninterruptibly()


该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。

  public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

看看响应中断的


d798b79b0f224666b5f7a5fcd0b2e20a.png


void acquireUninterruptibly(int permits)


该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。


    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }


void release()


该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

  public void release() {
      // 默认释放1个信号量 
        sync.releaseShared(1);
    }


    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
      // 2尝试释放资源
        if (tryReleaseShared(arg)) {
          // 3 资源释放成功,则调用park方法唤醒AQS  队列里最先挂起的线程  
            doReleaseShared();
            return true;
        }
        return false;
    }

85ef0c66a4be4a9f84ffc3abe91965ee.png


         protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) // cas 
                    return true;
            }


由代码release()->sync.releaseShared(1)可知,release方法每次只会对信号量值增加1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操作。tryReleaseShared方法增加信号量值成功后会执行代码(3)doReleaseShared();,即调用AQS的方法来激活因为调用aquire方法而被阻塞的线程。

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

void release(int permits)


该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加permits,而后者每次增加1。

    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }


另外可以看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会被阻塞。


小结


Semaphore也是使用AQS实现的,并且获取信号量时有公平策略和非公平策略之分。

相关文章
|
1天前
|
Java 数据挖掘 BI
Java医院绩效考核系统源码B/S+avue+MySQL助力医院实现精细化管理
医院绩效考核系统目标是实现对科室、病区财务指标、客户指标、流程指标、成长指标的全面考核、分析,并与奖金分配、学科建设水平评价挂钩。
28 0
|
3天前
|
数据采集 前端开发 Java
Java医院绩效考核系统源码maven+Visual Studio Code一体化人力资源saas平台系统源码
医院绩效解决方案包括医院绩效管理(BSC)、综合奖金核算(RBRVS),涵盖从绩效方案的咨询与定制、数据采集、绩效考核及反馈、绩效奖金核算到科到组、分配到员工个人全流程绩效管理;将医院、科室、医护人员利益绑定;全面激活人才活力;兼顾质量和效益、长期与短期利益;助力医院降本增效,持续改善、优化收入、成本结构。
14 0
|
4天前
|
监控 前端开发 Java
Java基于B/S医院绩效考核管理平台系统源码 医院智慧绩效管理系统源码
医院绩效考核系统是一个关键的管理工具,旨在评估和优化医院内部各部门、科室和员工的绩效。一个有效的绩效考核系统不仅能帮助医院实现其战略目标,还能提升医疗服务质量,增强患者满意度,并促进员工的专业成长
17 0
|
4天前
|
Java 云计算
Java智能区域医院云HIS系统SaaS源码
云HIS提供标准化、信息化、可共享的医疗信息管理系统,实现医患事务管理和临床诊疗管理等标准医疗管理信息系统的功能。优化就医、管理流程,提升患者满意度、基层首诊率,通过信息共享、辅助诊疗等手段,提高基层医生的服务能力构建和谐的基层医患关系。
30 2
|
5天前
|
Java
从源码出发:JAVA中对象的比较
从源码出发:JAVA中对象的比较
19 3
|
5天前
|
前端开发 Java 关系型数据库
Java医院绩效考核系统源码B/S架构+springboot三级公立医院绩效考核系统源码 医院综合绩效核算系统源码
作为医院用综合绩效核算系统,系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
27 2
|
5天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
21 0
|
XML 存储 Java
Java Review(三十三、异常处理----补充:断言、日志、调试)
Java Review(三十三、异常处理----补充:断言、日志、调试)
149 0
|
机器学习/深度学习 Java 程序员
Java Review(三十二、异常处理)
Java Review(三十二、异常处理)
114 0
Java Review(三十二、异常处理)
|
1天前
|
Java
Java中的多线程编程:基础知识与实践
【5月更文挑战第13天】在计算机科学中,多线程是一种使得程序可以同时执行多个任务的技术。在Java语言中,多线程的实现主要依赖于java.lang.Thread类和java.lang.Runnable接口。本文将深入探讨Java中的多线程编程,包括其基本概念、实现方法以及一些常见的问题和解决方案。