AQS(abstractQueuedSynchronizer)锁实现原理详解
AQS是什么
AQS(abstractQueuedSynchronizer)抽象队列同步器。其本身是一个抽象类,提供lock锁的实现。聚合大量的锁机制实现的共用方法。
AQS(abstractQueuedSynchronizer)结构图
为什么要使用AQS
1、我们使用的可重入锁(公平锁/非公平锁)底层实现是AQS。
通过实现AQS(队列同步器)子类Sync的FairSync和NonfairSync
2、为什么线程争抢锁时,没有抢到资源,线程会进入队列。
3、没有抢到资源的线程为什么不是等待而是阻塞
AQS实现原理
概况图:
简略介绍
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// 线程取消
static final int CANCELLED = 1;
//后继线程需要唤醒
static final int SIGNAL = -1;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
*/
volatile int waitStatus;
// 指向前一个结点的指针
volatile Node prev;
// 指向下一个节点的指针
volatile Node next;
// 节点中的线程
volatile Thread thread;
Node nextWaiter;
//获取当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
}
// head指针,指向双向链表队列虚节点/或者空节点
private transient volatile Node head;
// tail指针,指向双向队列尾节点
private transient volatile Node tail;
// 锁状态 0 表示无线程使用,大于等于表示存在线程占用
private volatile int state;
**注意: 锁状态 0 表示无线程使用,大于等于表示存在线程占用
private volatile int state;**
示例
现在我们有三个线程,使用lock锁实现互斥访问。
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" out");
}finally {
lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" come in");
}finally {
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+" out");
},"B").start();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" come in");
}finally {
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+" out");
},"C").start();
}
执行结果
ABC三个线程同时通过lock.lock()获取锁,只有拿到锁才能访问资源。
注意:我们创建的是不公平锁
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
......
}
}
public void lock() {
sync.lock(); //sync为AQS子抽象类,由于其为抽象类
//通过子类NonfairSync实现
// 此时A线程访问这里
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
// A线程通过unsafe的CAS原语,判断state(锁状态)是否等于0线程。 等于0将state设置为1.表示锁已经线程占用。
// A结果为true
if (compareAndSetState(0, 1))
//将A线程设置为独占线程(独占锁的线程)
setExclusiveOwnerThread(Thread.currentThread());
else
//state锁状态为1,已经被占用。 线程 B,线程C进入这里
// 尝试获取
acquire(1);
}
}
}
此时图为这样,线程A已经占有锁状态state ,其它线程入队
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// AQS方法
// B,C线程会进入这里,假设B线程先进
public final void acquire(int arg) {
// B线程 尝试获取锁,返回false,取反为true,
// addWaiter 将B线程加入双向链表队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 由于是不公平锁,调用的是NonFairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取进入的线程 B
final Thread current = Thread.currentThread();
// 获取锁状态
int c = getState();
// 锁是否被占用 c != 0 ,B线程无法进入
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是否为栈有锁的线程 B != A
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
// 创建包含B线程的Node
Node node = new Node(Thread.currentThread(), mode);
// tail默认为null,pred指向null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 将B线程节点加入队列,返回B节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
// 第一次 t指向 taill 为null
// 第二次 t指向 taill 为空节点
Node t = tail;
if (t == null) { // Must initialize
// 通过CAS原语 将head指向空节点
if (compareAndSetHead(new Node()))
// 将tail也指向 空节点
tail = head;
} else {
// 将B线程 所在节点,设置前驱节点为空节点
node.prev = t;
// 将B线程 所在节点,设置尾节点。tail指向 B线程所在节点
if (compareAndSetTail(t, node)) {
// 空节点后继节点指向 B节点
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
到此处,线程B所在节点Node已经入队,并返回了B节点Node
// AQS方法
// B,C线程会进入这里,假设B线程先进
public final void acquire(int arg) {
// B线程 尝试获取锁,返回false,取反为true,
// addWaiter 将B线程加入双向链表队列 ,并返回B节点
// acquireQueued(Node,arg)堵塞B线程,自旋获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 第一次. 获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
// 第二次,获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 第一次 shouldParkAfterFailedAcquire,结果false,p为空节点,node为b节点
// 第二次进入 shouldParkAfterFailedAcquire。如果B线程获取锁失败应该堵塞,parkAndCheckInterrupt堵塞B线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 线程获取锁失败之后,应该堵塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 节点等待状态默认为0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 第一次进入,ws为0 设置为-1 返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 使用lockSupport堵塞线程,B线程停止运行
LockSupport.park(this);
return Thread.interrupted();
}
到目前为止,进入双向链表队列的B线程正式被堵塞
如果此时A线程unlock,双向列表中队列节点会不在堵塞,获取state执行
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" out");
}finally {
lock.unlock();
}
},"A").start();
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//A线程尝试释放锁
if (tryRelease(arg)) {
// 获取head指向的 空节点Node
Node h = head;
if (h != null && h.waitStatus != 0)
// 解锁后继节点线程 B
unparkSuccessor(h);
return true;
}
return false;
}
// 线程A尝试释放锁,
protected final boolean tryRelease(int releases) {
// state 此时由 1变为0,返回true
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 尝试解锁后继 节点线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点 B节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// B线程不堵塞
if (s != null)
LockSupport.unpark(s.thread);
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 第一次. 获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
// 第二次,获取B节点的前驱节点,空节点 == head ,尝试获取锁,false,因为state为1
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 第一次 shouldParkAfterFailedAcquire,结果false,p为空节点,node为b节点
// 第二次进入 shouldParkAfterFailedAcquire。如果B线程获取锁失败应该堵塞,parkAndCheckInterrupt堵塞B线程
// 一旦B线程不堵塞之后,又开始循环获取state状态锁,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
公平锁和不公平锁细微差比
上述中可以看到,当执行的线程释放锁后,直接让队列中空节点的后继节点线程不在堵塞获取锁资源。这是公平锁的体现啊???
但是我创建的是不公平锁啊?????
那么不公平体现在哪呢?
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
不公平锁的不公平性主要体现在 lock.lock()的时候,线程通过CAS原语来抢夺锁,谁先抢到state归谁。
公平锁则是直接从双向列表队列中获取
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
}
CAS原理总结
1、默认不公平锁, lock.lock()的时候,就开始争夺锁state(锁状态)
**2、没有争抢到锁的线程,会被封装为node节点,进入队列,然后堵塞park。
注意:head指针指向虚节点(空节点),tail指针指向尾节点**
3、当执行的线程unlock时,会通知队列中后继节点线程不堵塞unpark,继续执行任务即可