并发编程里面的很多常用的类,例如ReentrantLock,Semaphore,CountDownLatch实际上底层都是通过使用AbstractQueuedSynchronizer(AQS)来进行实现的。那么今天我们就来仔细聊聊AQS这样东西。
底层的核心主要是维护一个volatile int waitStatus的状态值,以及一个FIFO线程等待队列。对于waitStatus变量,AQS里面提供了三种方式:
- getState()
- setState()
- compareAndSetState()
同时在AQS里面定义了两种对于资源访问的方式,独占模式和共享模式。简单来说,独占模式就是一次只能由一个线程执行,例如常见了ReetrantLock,共享模式则是允许多个线程同时执行,例如说Semaphore,CountDownLatch。
AQS同步器里面经常会用到以下的几种方法:
//独占模式中常用的一种获取资源的函数 boolean tryAcquire(int arg) //独占模式中常用的一种释放资源的函数 boolean tryRelease(int arg) //该线程是否正在独占资源。只有用到condition才需要去实现它 boolean isHeldExclusively() //共享模式中常用到的获取资源的方式 int tryAcquireShared(int arg) //共享模式中常用到的释放资源的方式 boolean tryReleaseShared(int arg) 复制代码
源码分析模块:
我们首先来模拟一个场景进行思维导向。
在独占模式下一个线程请求获取资源的过程:
1.首先是进入aqs里面的public final void acquire(int arg) 这个函数中
来看看里面的源码先:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
首先是通过tryAcquire()请求获取资源,如果成功则直接结束。
addWaiter()函数通过阅读代码可以明白,它实际上是将当前线程放在了请求的队列最尾端
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } 复制代码
acquireQueued()函数里面的内容,通过阅读源码也可以明白其中的含义:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
通过一个for循环来不断地tryAcquire()来获取资源
再继续看源码tryAcquire()部分,这里面的代码内容是抛出一个异常,因为AQS本身是一个框架,对于具体的获取资源处理,它将其交给了开发者去自定义处理。
同步队列
同步器内部的数据结构其实是一个基于FIFO原则来设计的双向队列:
对于其中的每个node节点,都有一个叫做waitStatus的变量来表示当前线程的状态。查看源码可以看到这四种状态:
- CANCELLED:表示被中断或者取消的状态
- SIGNAL:表示前面的节点已经释放了同步锁,等待被唤醒的一个状态
- CONDITION:表示处于等待状态
- PROPAGATE:表示处于共享模式中的运行状态
AQS里面对于插入队尾的操作和我们常用的list集合插入操作有所出入,为了保证队列操作的原子性和有序性,源码里面采用了cas自旋的方式来实现。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
这一段代码和核心还是使用了unsafe类里面的接口来指定内存进行分配。
通过对于上述代码的分析,我们来一段小结先:
当多个线程同时发出请求的时候,会发生资源竞争(tryAcquire()和addWaiter()),所有请求的线程会按照先来后到的规矩排成一条队列,刚刚上述的分析主要是针对于获取锁的过程而言。那么对于未获取到资源的线程而言,下一步又应该处理什么呢?
接着我们来看到上边提到的acquireQueued()函数:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // 将引用对象置空,有利于GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
final Node p = node.predecessor();是获取当前等待节点的前一个节点,也就是即将释放锁的节点元素。这里面的核心主要还是通过for循环的方式进行自旋操作,不断地请求资源,一方获取资源成功之后,便将该节点的next和pre还有thread先置空,然后进入shouldParkAfterFailedAcquire(p, node)当中,继续深入源码分析:
这里可以结合我们前边提及到的waitStatus状态数值来进行分析
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 节点处于待唤醒状态 */ return true; if (ws > 0) { /* * 前驱节点被中断了,于是继续查找前驱节点,查看是否有元素满足要求 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 这里面也是调用了unsafe类里面的cas操作来进行节点状态的更新 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
直到进行到parkAndCheckInterrupt()函数里面,才算是真正的让该线程进入休眠状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
说完了获取锁的整个思路,再来看看锁的释放吧。
在同步队列里面的node节点元素汇总,有个waitStatus变量:
volatile int waitStatus;
这里之所以用volatile来进行修饰的原因,我个人认为是volatile修饰的关键字在进行修改了之后,会向其他线程发送信号,重新从主存中读取该值,保证了该变量在各个线程之间的可见性。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } 复制代码
这段代码中的核心部分为LockSupport.unpark(s.thread);主要是起到解除阻塞线程的一个作用,通俗点来说,就是唤醒队列中正在等待的下一个节点。
结合上述的独占式模式的一个分析思路,对于共享模式的分析也是大同小异了。