前言
谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer)
,所谓的AQS
即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
等都是基于AQS
来实现的。
我们先看下AQS
相关的UML
图:
思维导图(高清无损 AV 画质长图.pdf 关注公众号回复 AQS 获取)
AQS实现原理
AQS
中 维护了一个volatile int state
(代表共享资源)和一个FIFO
线程等待队列(多线程争用资源被阻塞时会进入此队列)。
这里volatile
能够保证多线程下的可见性,当state=1
则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO
的等待队列中,比列会被UNSAFE.park()
操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。
另外state
的操作都是通过CAS
来保证其并发修改的安全性。
具体原理我们可以用一张图来简单概括:
AQS
中提供了很多关于锁的实现方法,
- getState():获取锁的标志state值
- setState():设置锁的标志state值
- tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。
这里还有一些方法并没有列出来,接下来我们以ReentrantLock
作为突破点通过源码和画图的形式一步步了解AQS
内部实现原理。
目录结构
文章准备模拟多线程竞争锁、释放锁的场景来进行分析AQS
源码:
三个线程(线程一、线程二、线程三)同时来加锁/释放锁
目录如下:
- 线程一加锁成功时
AQS
内部实现 - 线程二/三加锁失败时
AQS
中等待队列的数据模型 - 线程一释放锁及线程二获取锁实现原理
- 通过线程场景来讲解公平锁具体实现原理
- 通过线程场景来讲解Condition中a
wait()
和signal()
实现原理
这里会通过画图来分析每个线程加锁、释放锁后AQS
内部的数据结构和实现原理
场景分析
线程一加锁成功
如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二和线程三抢占锁失败,具体执行流程如下:
此时AQS
内部数据为:
线程二、线程三加锁失败:
有图可以看出,等待队列中的节点Node
是一个双向链表,这里SIGNAL
是Node
中waitStatus
属性,Node
中还有一个nextWaiter
属性,这个并未在图中画出来,这个到后面Condition
会具体讲解的。
具体看下抢占锁代码实现:
java.util.concurrent.locks.ReentrantLock .NonfairSync:
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
这里使用的ReentrantLock非公平锁,线程进来直接利用CAS
尝试抢占锁,如果抢占成功state
值回被改为1,且设置对象独占锁线程为当前线程。如下所示:
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
线程二抢占锁失败
我们按照真实场景来分析,线程一抢占锁成功后,state
变为1,线程二通过CAS
修改state
变量必然会失败。此时AQS
中FIFO
(First In First Out 先进先出)队列中数据如图所示:
我们将线程二执行的逻辑一步步拆解来看:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先看看tryAcquire()
的具体实现:java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire()
:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
nonfairTryAcquire()
方法中首先会获取state
的值,如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加state
值,这就是可重入锁的具体实现,累加state
值,释放锁的时候也要依次递减state
值。
如果state
为0,则执行CAS
操作,尝试更新state
值为1,如果更新成功则代表当前线程加锁成功。
以线程二为例,因为线程一已经将state
修改为1,所以线程二通过CAS
修改state
的值不会成功。加锁失败。
线程二执行tryAcquire()
后会返回false,接着执行addWaiter(Node.EXCLUSIVE)
逻辑,将自己加入到一个FIFO
等待队列中,代码实现如下:
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter()
:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
这段代码首先会创建一个和当前线程绑定的Node
节点,Node
为双向链表。此时等待对内中的tail
指针为空,直接调用enq(node)
方法将当前线程加入等待队列尾部:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
第一遍循环时tail
指针为空,进入if逻辑,使用CAS
操作设置head
指针,将head
指向一个新创建的Node
节点。此时AQS
中数据:
执行完成之后,head
、tail
、t
都指向第一个Node
元素。
接着执行第二遍循环,进入else
逻辑,此时已经有了head
节点,这里要操作的就是将线程二对应的Node
节点挂到head
节点后面。此时队列中就有了两个Node
节点:
addWaiter()
方法执行完后,会返回当前线程创建的节点信息。继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
逻辑,此时传入的参数为线程二对应的Node
节点信息:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued()
:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndChecknIterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
acquireQueued()
这个方法会先判断当前传入的Node
对应的前置节点是否为head
,如果是则尝试加锁。加锁成功过则将当前节点设置为head
节点,然后空置之前的head
节点,方便后续被垃圾回收掉。
如果加锁失败或者Node
的前置节点不是head
节点,就会通过shouldParkAfterFailedAcquire
方法 将head
节点的waitStatus
变为了SIGNAL=-1
,最后执行parkAndChecknIterrupt
方法,调用LockSupport.park()
挂起当前线程。
此时AQS
中的数据如下图:
此时线程二就静静的待在AQS
的等待队列里面了,等着其他线程释放锁来唤醒它。
线程三抢占锁失败
看完了线程二抢占锁失败的分析,那么再来分析线程三抢占锁失败就很简单了,先看看addWaiter(Node mode)
方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
此时等待队列的tail
节点指向线程二,进入if
逻辑后,通过CAS
指令将tail
节点重新指向线程三。接着线程三调用enq()
方法执行入队操作,和上面线程二执行方式是一致的,入队后会修改线程二对应的Node
中的waitStatus=SIGNAL
。最后线程三也会被挂起。此时等待队列的数据如图: