Java并发编程笔记之AbstractQueuedSynchronizer源码分析

简介: 为什么要说AbstractQueuedSynchronizer呢? 因为AbstractQueuedSynchronizer是JUC并发包中锁的底层支持,AbstractQueuedSynchronizer是抽象同步队列,简称AQS,是实现同步器的基础组件,并发包中锁的实现底层就是使用AQS实现,另外大多数人可能不会直接用到AQS, 但是知道其原理对于架构设计还是很有帮助的。

为什么要说AbstractQueuedSynchronizer呢?

因为AbstractQueuedSynchronizer是JUC并发包中锁的底层支持,AbstractQueuedSynchronizer是抽象同步队列,简称AQS,是实现同步器的基础组件,并发包中锁的实现底层就是使用AQS实现,另外大多数人可能不会直接用到AQS,

但是知道其原理对于架构设计还是很有帮助的。

 

首先我们看一下AQS的类图结构,如下图:

从类图的关系可以看到AQS是一个FIFO的双向队列,内部通过节点head 和 tail 记录队首和队尾元素,队列元素类型为Node。其中Node中Thread变量用来存放进入AQS队列里面的线程;Node 节点内部SHARED用来标记该线程是获取共享资源时候被阻塞挂起来后放入AQS队列,

EXCLUSIVE标记线程是获取独占资源时候被挂起后放入AQS队列;waitStatus记录当前线程等待状态,分别为CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在条件队列里面等待),PROPAGATE(释放共享资源时候需要通知其他节点);

pre记录当前节点的前驱节点,next记录当前节点后继节点。

 

AQS中维持了一个单一的状态信息state,可以通过getState,setState,compareAndSetState 函数修改其值;对于ReentrantLock 的实现来说,state 可以用来表示当前线程获取锁的可重入次数;

对应读写锁ReentrantReadWriteLock 来说state 的高16位表示读状态,也就是获取该读锁的次数,低 16位 表示获取到写锁的线程的可重入次数;对于FuterTask 来说,state用来表示任务状态(例如,还没开始,运行,完成,取消);

对应CountDownlatch 和CyclicBarrie 来说,state用来表示计数器当前的值。

 

AQS有个内部类ConditionObject 是用来结合锁实现线程同步,ConditionObject可以直接访问AQS对象内部的变量,比如 state 状态值 和AQS队列;

ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来存放调用条件变量的await()方法后被阻塞的线程,如上类图所示,这个条件队列的头尾元素分别为firstWaiter 和 lastWaiter。

 

对于AQS 来说,线程同步的关键是对状态值state进行操作,根据state是否属于一个线程,操作state的方式分为独占模式和共享模式。

独占模式下获取和释放资源使用方法的源码如下:

void acquire(int arg)
void acquireInterruptibly(int arg)
boolean release(int arg)

 

共享模式下获取和释放资源方法的源码如下:

void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)

 

对于独占锁方式获取的资源是与具体线程绑定的,也就是说如果一个线程获取到了资源,就会标记是那个线程获取到的,其他线程尝试操作state获取资源时候发现当前该资源不是自己持有,就会获取失败后被阻塞;

比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,AQS内部会首先使用CAS操作把state状态从0 变成 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁的时候,发现当前线程就是锁的持有者,则会把state状态值从1变成2,

也就是设置可重入次数,当另外一个线程获取锁的时候发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

 

对于共享操作方式资源是与具体线程不相关的,多个线程去请求资源时候是通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次获取时候,如果 当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可,

共享模式下并不需要记录哪个线程获取了资源;比如 Semaphore 信号量,当一个线程通过acquire()方法获取一个信号量时候,会首先看当前信号两个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

 

那么我们接下来首先看独占模式的获取和释放资源的流程,如下:

  1.当一个线程调用acquire(int arg)方法获取独占资源的时候,会首先使用tryAcquire 尝试获取资源,具体是设置状态变量state的值,成功则直接返回。失败则将当前线程封装为类型Node.EXCLUSIVE 的Node节点后插入到AQS阻塞队列尾部,

并调用LockSupport.part(this)挂起当前线程。

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  2.当一个线程调用release(int arg)的时候会尝试使用tryRelease 操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)激活AQS队列里面最早被阻塞的线程(thread)。

被激活的线程则会使用tryAcquire尝试看当前状态变量state的值是否满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

这里需要注意的是AQS类并没有提供可用的tryAcquire 和 tryRelease,正如AQS是锁阻塞和同步容器的基础框架,是抽象类,tryAcquire和 tryRelease 需要有具体的子类来实现的。

子类在实现tryAcquire 和 tryRelease 时候要根据具体场景使用CAS算法尝试修改该状态值state,成功则返回true,否则返回false。子类还需要定义在调用acquire 和 release 方法时候 state 状态值的增减代表什么含义。

 

比如继承自AQS实现的独占锁ReentrantLock,定义当status为0的时候标示锁空闲,为1 的时候标示锁已经被占用,在重写tryAcquire的时候,内部需要使用CAS算法看当前status是否为0,如果为0 则使用CAS设置为1,

并设置当前线程持有者为当前线程,并返回true,如果CAS失败则返回false。继承自 AQS 实现的独占锁实现 tryRelease 时候,内部需要使用CAS算法把当前status值从1 修改为0,并设置当前锁的持有者为null,然后返回true,如果CAS失败则返回false。

 

接下来我们看一下共享资模式的获取与释放流程,如下:

  1.当前线程调用acquireShared(int arg) 获取共享资源时候,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回。失败则将当前线程封装为类型Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列尾部,并使用 LockSupport.park(this) 挂起当前线程。

  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

 

  2.当一个线程调用 releaseShared(int arg) 时候会尝试使用, tryReleaseShared 操作释放资源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark(thread)激活 AQS 队列里面最早被阻塞的线程 (thread)。被激活的线程则使用 tryReleaseShared 尝试看当前状态变量 state 的值是否能满足自己的需要,

满足则该线程被激活然后继续向下运行,否者还是会被放入 AQS 队列并被挂起。

  

  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

同理需要注意的 AQS 类并没有提供可用的 tryAcquireShared 和 tryReleaseShared,正如 AQS 是锁阻塞和同步器的基础框架,tryAcquireShared 和 tryReleaseShared 需要有具体的子类来实现。

子类在实现 tryAcquireShared 和 tryReleaseShared 时候要根据具体场景使用 CAS 算法尝试修改状态值 state, 成功则返回 true,否者返回 false。

比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryAcquireShared 时候,首先看写锁是否被其它线程持有,如果是则直接返回 false,否者使用 CAS 递增 status 的高 16 位,在 ReentrantReadWriteLock 中 status 的高 16 为获取读锁的次数。

继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryReleaseShared 时候,内部需要使用 CAS 算法把当前 status 值的高 16 位减一,然后返回 true, 如果 cas 失败则返回 false。

 

基于 AQS 实现的锁除了需要重写上面介绍的方法,还需要重写 isHeldExclusively 方法用来判断锁是被当前线程独占还是被共享。

 

接下来我们再看看独占模式和共享模式下获取资源的带有 Interruptibly 关键字的函数

独占模式下的:

void acquire(int arg)
void acquireInterruptibly(int arg)

 

共享模式下的:

void acquireShared(int arg) 
void acquireSharedInterruptibly(int arg)

 

这两套函数都有一个带有 Interruptibly 关键字的函数,那么带有这个关键字的和不带有什么区别呢?

其实不带Interruptibly关键字的方法是不对中断进行响应,也就是线程在调用不带Interruptibly关键字的方法在获取资源的时候或者获取资源失败被挂起的时候,其他线程中断了该线程,那么改线程不会因为被中断而抛出异常,

还是继续获取资源或者不被挂起,也就是不对中断进行响应,忽略中断。

 

带Interruptibly关键字的方法是对中断进行响应,也就是说线程在调用带Interruptibly关键字的方法在获取资源的时候或者获取资源失败被挂起的时候,其他线程中断了该线程,那么该线程会抛出InterruptedException 异常而返回。

 

那么我要思考一下,AQS提供的队列是如何维护的呢?

我们主要看入队操作:当一个线程获取锁失败后,该线程会被转换为Node节点,然后就会使用enq(final Node node)方法插入该节点到AQS的阻塞队列,源码如下:

  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;//(1)
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//(2)
                    tail = head;
            } else {
                node.prev = t;//(3)
                if (compareAndSetTail(t, node)) {//(4)
                    t.next = node;
                    return t;
                }
            }
        }
    }    

源码程序逻辑的节点图如下:

结合上图和上面的源码,如上代码,第一次循环当要在AQS 队列尾部插入元素时候,AQS队列状态为图(default),也就是队列头尾节点都指向null;

当执行代码(1)后,节点 t 指向了尾部节点,这时候队列如图(1)。

可知这时候 t 为 null,则执行代码(2)使用 CAS 算法设置一个哨兵节点为头结点,如果 CAS 设置成功,然后让尾部节点也指向哨兵节点,这时候队列状态如图(II)。

到现在只是插入了一个哨兵节点,还需要插入的 node 节点,所以第二次循环后执行到步骤(1),这时候队列状态如图(III);

然后执行代码(3)设置 node 的前驱节点为尾部节点,这时候队列状态图如图(IV);然后通过 CAS 算法设置 node 节点为尾部节点,CAS 成功后队列状态图为(V);

CAS 成功后在设置原来的尾部节点的后驱节点为 node, 这时候就完成了双向链表的插入了,这时候队列状态为图(VI)。

 

AQS —— 条件变量的支持

notify 和 wait 是配合 synchronized 内置锁实现线程间同步基础设施,条件变量的 signal 和 await 方法是用来配合锁(使用 AQS 实现的锁)实现线程间同步的基础设施。

调用共享变量的 notify 和 wait 方法前必须先获取该共享变量的内置锁,同理在调用条件变量的 signal 和 await 方法前必须先获取条件变量对应的锁。

 

那么什么是条件变量呢?如何使用呢?下面看一个例子,代码如下:

ReentrantLock lock = new ReentrantLock();//(1)
Condition condition = lock.newCondition();//(2)

lock.lock();//(3)
try {
    System.out.println("begin wait");
    condition.await();//(4)
    System.out.println("end wait");

} catch (Exception e) {
    e.printStackTrace();

} finally {
    lock.unlock();//(5)
}

lock.lock();//(6) try { System.out.println("begin signal"); condition.signal();//(7) System.out.println("end signal"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//(8) }

代码(1)创建了一个独占锁 ReentrantLock 的对象,ReentrantLock 是基于 AQS 实现的锁。

代码(2)使用创建的 lock 对象的 newCondition()方法创建了一个 ConditionObject 变量,这个变量就是 lock 锁对应的一个条件变量。需要注意的是一个 Lock 对象可以创建多个条件变量。

代码(3)首先获取了独占锁,代码(4) 则调用了条件变量的 await()方法阻塞挂起了当前线程,当其它线程调用了条件变量的 signal 方法时候,被阻塞的线程才会从 await 处返回,需要注意的是和调用 Object 的 wait 方法一样,

如果在没有获取到锁前调用了条件变量的 await 方法会抛出 java.lang.IllegalMonitorStateException 异常。

代码(5) 则释放了获取的锁。

其实这里的lock对象等价于synchronized 加上共享变量,当调用lock.lock()方法就相当于进入了 synchronized 块(获取了共享变量的内置锁),当调用 lock.unLock() 方法时候就相当于退出了 synchronized 块。

当调用条件变量的 await() 方法时候就相当于调用了共享变量的 wait() 方法,当调用了条件变量的 signal 方法时候就相当于调用了共享变量的 notify() 方法。当调用了条件变量的 signalAll()方法时候就相当于调用了共享变量的 notifyAll() 方法。

 

到这里对条件变量有了一定的认识了,上面通过lock.newCondition() 作用其实是new  了一个AQS内部声明的ConditionObject对象,如一开始所看到的类图,ConditionObject 是AQS的内部类,可以访问到AQS内部的变量(例如状态变量 status 变量)和方法。

对应每个条件变量的内部维护了一个条件队列,用来存放当调用条件变量的await()方法被阻塞的线程。注意这个条件队列和AQS队列不是一回事。

 

 

如下代码,当线程调用了条件变量的await()方法时候(事先必须先调用了锁的 lock() 方法获取锁),内部会构造一个类型为Node.CONDITION 的 node 节点,然后插入该节点到条件队列末尾,然后当前线程会释放获取的锁(也就是会操作锁对应的 status 变量的值),并被阻塞挂起。

这时候如果有其他线程调用了lock.lock() 尝试获取锁时候,就会有一个线程获取到锁,如果获取到锁的线程有调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,阻塞到await()方法处。

源码如下:

  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
             //创建新的node,并插入到条件队列末尾(9)
            Node node = addConditionWaiter();
            //释放当前线程获取的锁(10)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //调用park方法阻塞挂起当前线程(11)
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            ...
      }

 

如下代码,当另外一个线程调用了条件变量的signal方法的时候(事先必须先调用了锁的 lock() 方法获取锁),内部会把条件队列里面队头的一个线程节点从条件队列里面移除后放入到AQS的阻塞队列里面,然后激活该线程。

  public final void signal() {
       if (!isHeldExclusively())throw new IllegalMonitorStateException();
          Node first = firstWaiter;
       if (first != null)
          //移动条件队列队头元素到AQS队列
           doSignal(first);
   }

 

需要注意的是AQS只提供了ConditionObject 的实现,并没有提供 newCondition 函数来 new 一个 ConditionObject 对象,需要由 AQS 的子类来提供 newCondition 函数。

 

接下来我们要看一下当一个线程调用条件变量的 await() 方法被阻塞后,如何放入的条件队列。源码如下:‘

   private Node addConditionWaiter() {
            Node t = lastWaiter;
            ...
            //(1)
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //(2)
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;//(3)
            lastWaiter = node;//(4)
            return node;
        }

如上代码(1)首先根据当前线程创建了一个类型为Node.CONDITION 的节点,然后通过步骤(2)(3)(4)在单向条件队列尾部插入一个元素。

 

最后注意如下几点:

  1.当多个线程同时调用 lock.lock() 获取锁的时候,同时只有一个线程获取到了该锁,其他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁;

  2.如果获取到锁的线程又调用了对应的条件变量的 await() 方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面;

  3.这时候因为调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的 await()方法则该线程也会被放入条件变量的条件队列。

  4.当另外一个线程调用了条件变量的 signal() 或者 signalAll() 方法时候,会把条件队列里面的一个或者全部 Node 节点移动到 AQS 的阻塞队列里面,等待时机获取锁。

如下图:

如上图可以看到,一个锁对应有一个 AQS 阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。

目录
相关文章
|
4天前
|
Java 数据库连接 网络安全
JDBC数据库编程(java实训报告)
这篇文章是关于JDBC数据库编程的实训报告,涵盖了实验要求、实验环境、实验内容和总结。文中详细介绍了如何使用Java JDBC技术连接数据库,并进行增删改查等基本操作。实验内容包括建立数据库连接、查询、添加、删除和修改数据,每个部分都提供了相应的Java代码示例和操作测试结果截图。作者在总结中分享了在实验过程中遇到的问题和解决方案,以及对Java与数据库连接操作的掌握情况。
JDBC数据库编程(java实训报告)
|
1天前
|
Java 开发者
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案
在Java编程中,if-else与switch作为核心的条件控制语句,各有千秋。if-else基于条件分支,适用于复杂逻辑;而switch则擅长处理枚举或固定选项列表,提供简洁高效的解决方案。本文通过技术综述及示例代码,剖析两者在性能上的差异。if-else具有短路特性,但条件增多时JVM会优化提升性能;switch则利用跳转表机制,在处理大量固定选项时表现出色。通过实验对比可见,switch在重复case值处理上通常更快。尽管如此,选择时还需兼顾代码的可读性和维护性。理解这些细节有助于开发者编写出既高效又优雅的Java代码。
6 2
|
1天前
|
Java 开发者
在Java编程的广阔天地中,if-else与switch语句犹如两位老练的舵手,引领着代码的流向,决定着程序的走向。
在Java编程中,if-else与switch语句是条件判断的两大利器。本文通过丰富的示例,深入浅出地解析两者的特点与应用场景。if-else适用于逻辑复杂的判断,而switch则在处理固定选项或多分支选择时更为高效。从逻辑复杂度、可读性到性能考量,我们将帮助你掌握何时选用哪种语句,让你在编程时更加得心应手。无论面对何种挑战,都能找到最适合的解决方案。
5 1
|
1天前
|
搜索推荐 Java 程序员
在Java编程的旅程中,条件语句是每位开发者不可或缺的伙伴,它如同导航系统,引导着程序根据不同的情况做出响应。
在Java编程中,条件语句是引导程序根据不同情境作出响应的核心工具。本文通过四个案例深入浅出地介绍了如何巧妙运用if-else与switch语句。从基础的用户登录验证到利用switch处理枚举类型,再到条件语句的嵌套与组合,最后探讨了代码的优化与重构。每个案例都旨在帮助开发者提升编码效率与代码质量,无论是初学者还是资深程序员,都能从中获得灵感,让自己的Java代码更加优雅和专业。
5 1
|
1天前
|
Java
在Java编程的广阔天地中,条件语句是控制程序流程、实现逻辑判断的重要工具。
在Java编程中,if-else与switch作为核心条件语句,各具特色。if-else以其高度灵活性,适用于复杂逻辑判断,支持多种条件组合;而switch在多分支选择上表现优异,尤其适合处理枚举类型或固定选项集,通过内部跳转表提高执行效率。两者各有千秋:if-else擅长复杂逻辑,switch则在多分支选择中更胜一筹。理解它们的特点并在合适场景下使用,能够编写出更高效、易读的Java代码。
5 1
|
1天前
|
缓存 负载均衡 安全
|
3天前
|
存储 缓存 安全
深度剖析Java HashMap:源码分析、线程安全与最佳实践
深度剖析Java HashMap:源码分析、线程安全与最佳实践
|
3天前
|
设计模式 算法 Java
Java编程中的设计模式:简化复杂性的艺术
在Java的世界中,设计模式如同一位智慧的导师,指引着开发者们在复杂的编码迷宫中找到出口。本文将深入浅出地探讨几种常见的设计模式,通过实例演示如何在Java项目实践中运用这些模式,从而提升代码的可维护性和扩展性。无论你是新手还是资深开发者,这篇文章都将为你打开一扇通往高效编码的大门。
12 1
|
5天前
|
存储 安全 Java
java集合框架学习笔记
这篇文章是关于Java集合框架的详细学习笔记,包括集合的概念、使用方式以及List、Set和Map等集合类型的具体实现和特点。
java集合框架学习笔记
|
3天前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践