前提回顾
之前写了相关原理和部分源码流程的介绍文字 ,☕【Java原理探索】夯实你对AQS的认识和基础。本篇文章主要写了相关针对于源码的分析和认识
大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLock,Semaphore,Java线程池中的Worker等。本文将介绍AQS相关的实现细节。
AbstractQueuedSynchronizer介绍
AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState及compareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore中该状态值表示剩余的许可数量。
可以看下使用的AbstractQueuedSynchronizer的并发工具类:
AbstractQueuedSynchronizer实现
AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:
AbstractOwnableSynchronizer
当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } /** * 当前独占同步器的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置当前独占同步器的线程 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /** * 获取当前独占同步器的线程 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } } 复制代码
相关解释
针对于排他锁属性,存放相关排他锁(当前独占同步器的线程),设置当前的排他锁线程属性对应线程(Thread)引用。主要是设置排他锁(拥有占用当前对象资源的线程对象引用)
AbstractQueuedSynchronizer(CLH锁的原理)
AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。
CLH锁的链表中的节点被抽象为Node:
static final class Node { /** * 标记节点正以共享模式等待 */ static final Node SHARED = new Node(); /** * 标记节点正以独占模式等待 */ static final Node EXCLUSIVE = null; // ===== 以下表示节点的等待状态 ===== /** * 表示当前的线程被取消 */ static final int CANCELLED = 1; /** * 表示当前节点的后继节点包含的线程需要运行,也就是park下的状态一样,千万别混淆了。 */ static final int SIGNAL = -1; /** * 表示当前节点在等待condition,也就是在condition队列中,更多的就是 在于wait或者sleep等情况下的状态。 */ static final int CONDITION = -2; /** * 示当前场景下后续的acquireShared能够得以执行 */ static final int PROPAGATE = -3; /** * 状态 */ volatile int waitStatus; /** * 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成 * 连接。 */ volatile Node prev; /** * 后继结点 */ volatile Node next; /** * 入队列时的当前线程 */ volatile Thread thread; /** * 存储condition队列中的后继节点。 */ Node nextWaiter; ... } 复制代码
相关解释
- 判断是否是共享锁或者排他锁机制类型。
- 标识当前线程对象节点的状态类型:Cancel代表取消状态,当执行cancelAcquire之后节点的状态。也有Signal状态的,标识待被前置节点唤醒(本质是后置节点进行轮询进行控制状态),也有Condition状态,代表其他形式的阻塞或者等待排队机制,PROPAGATE状态,更多处于相关的迁移机制,允许同时进行释放唤醒多个waiter节点。
- 以及等待下一个状态的nextwaiter节点机制标识conidtion状态下的后置节点。
- 包含当前节点所含的线程对象、前置和后置指针。
其中AbstractQueuedSynchronizer维护的链表结构大致如下:
ReentrantLock
可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lock和unlock操作:
public class ReentrantLock implements Lock, java.io.Serializable { // 同步器,用于实现锁机制 private final Sync sync; /** * 基础的同步器实现 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 由公平锁和非公平锁实现 */ abstract void lock(); /** * 非公平锁时,尝试加锁 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 同步器状态 int c = getState(); if (c == 0) { // 若同步器状态为初始状态,则尝试加锁 if (compareAndSetState(0, acquires)) { // 设置锁的占用线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经加锁过,则设置state为锁的重入次数+1 int nextc = c + acquires; if (nextc < 0) // 超出了锁重入的最大次数 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 尝试释放同步器 */ protected final boolean tryRelease(int releases) { // 释放后的新状态 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) // 非占用线程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // state归零,释放成功 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /* * 当前线程是否独占该锁 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /* * 创建一个条件对象 */ final ConditionObject newCondition() { return new ConditionObject(); } /* * 获取当前独占线程 */ final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } /* * 获取锁被重入的次数 */ final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } /* * 锁是否被占用 */ final boolean isLocked() { return getState() != 0; } /** * 从对象流中反序列化锁对象 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); // 重置为初始状态 setState(0); } } 复制代码
非公平锁机制Sync同步器
/** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加锁 */ final void lock() { // 先尝试直接加锁,即抢占式 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 失败后,就排队抢锁 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 复制代码
公平锁机制Sync同步器
/** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 直接进行排队抢锁,保持公平 acquire(1); } /** * 尝试获取锁 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 若没有其他线程已经在等待队列中,则尝试加锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程已经占有锁,则重入次数 + acquires int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 复制代码
默认为非公平锁(性能极佳)
/** * 默认非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 请求锁 */ public void lock() { sync.lock(); } /** * 尝试加锁,可被中断 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试加锁 */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** * 加锁,具有超时限制 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 解锁 */ public void unlock() { sync.release(1); } /** * 创建一个条件对象 */ public Condition newCondition() { return sync.newCondition(); } /** * 获取锁的重入次数 */ public int getHoldCount() { return sync.getHoldCount(); } /** * 锁是否被当前线程持有 */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 锁是否已被持有 */ public boolean isLocked() { return sync.isLocked(); } /** * 是否是公平锁 */ public final boolean isFair() { return sync instanceof FairSync; } /** * 获取占用锁的线程 */ protected Thread getOwner() { return sync.getOwner(); } /** * 是否有等待的线程 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 判断线程是否在等待队列中 */ public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } /** * 获取等待队列长度,并发时,不是绝对精确 */ public final int getQueueLength() { return sync.getQueueLength(); } /** * 获取等待的线程集合,不是绝对精确 */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } /** * 判断是否有线程在某条件上等待 */ public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } /** * 获取在某条件上等待的线程数 */ public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } /** * 获取在某条件上等待的线程集 */ protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } } 复制代码
当ReentrantLock执行lock()时,主要是通过AbstractQueuedSynchronizer的acquire()方法实现:
// ReentrantLock.lock() public void lock() { sync.lock(); } // FairSync.sync() final void lock() { acquire(1); } // 获取锁 public final void acquire(int arg) { // 尝试获取锁: // 1. 成功时,直接返回 // 2. 失败时,以独占的方式将当前线程入队addWaiter,并且等待自旋等待acquireQueued if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 若等待返回了,则中断自己 selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先尝试直接入队尾, // 并发时有可能失败,则通过enq入队 Node pred = tail; if (pred != null) { // 已经有线程在等待,则尝试直接设置node为tail node.prev = pred; if (compareAndSetTail(pred, node)) { // 链接旧的tail.next -> node pred.next = node; return node; } } //等待队列为空或并发时compareAndSetTail失败,则尝试继续插入等待节点 enq(node); return node; } // 新节点入队,返回旧的尾节点 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 等待队列此时为空,初始化头节点 if (compareAndSetHead(new Node())) // 初始化尾节点 tail = head; } else { // 等待队列不为空 // 链接新节点的前驱节点为尾节点 node.prev = t; // 设置新节点为尾节点 if (compareAndSetTail(t, node)) { // 链接旧尾节点的后驱节点为新节点 t.next = node; return t; } } } } // 自旋等待 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 从当前节点往前找到头节点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 已获取到锁,设置新的头节点,相当于节点出队 setHead(node); // 释放掉等待节点,头节点是没有next属性的 p.next = null; // help GC failed = false; return interrupted; } // 当请求锁失败后,检查节点是否被需要阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 若排队失败,则取消获取锁的请求 if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 节点的前驱节点状态为SIGNAL时,表示该节点已经请求过需要被唤醒,可以安全地阻塞 */ return true; if (ws > 0) { /* * 若前驱节点已被取消,则忽略这些取消的节点,继续往前查找未 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; }else { /* * 此时前驱节点的状态为0或PROPAGATE(-3),此时需要一个唤醒节点信号,但没必要阻塞线程 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 阻塞当前线程 LockSupport.park(this); // 线程是否被中断,且重置中断状态 return Thread.interrupted(); } 复制代码
内容解释
- tryAcquire:主要用于自定义实现state状态,一遍可以实现相关线程的阻塞以及换线操作
- addWaiter:主要添加任务节点到对位,包含了cas设置机制以及自旋方式进行设置队尾机制
- acquireQueued:主要包含了相关的设置前驱节点的状态机制从而是进行先关的任务状态设置,此 外,还有一些是否需要阻塞进行当前任务机制。
- parkAndCheckInterrupt:进行自我中断机制,
尝试取消请求
// private void cancelAcquire(Node node) { // 忽略不存在的节点 if (node == null) return; // 节点线程置空 node.thread = null; // 忽略取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext节点,表示node节点前的第一个非取消状态节点的后继节点 Node predNext = pred.next; // 将节点状态设置为取消 node.waitStatus = Node.CANCELLED; // 如果当前节点是尾节点,设置新的尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 将node的后继节点置空 compareAndSetNext(pred, predNext, null); } else { // 若node不为尾节点,即为链表中间的节点 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //如果node的前驱节点不是头节点,那么需要给当前节点的后继节点一个"等待唤醒"的标记, //即将当前节点的前驱节点等待状态设置为SIGNAL,然后将其设置为当前节点的后继节点的前驱节点 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 唤醒node节点的后继节点。 unparkSuccessor(node); } // 让取消节点的next引用会指向自己 node.next = node; } } 复制代码
初始状态
当只有一个线程t1进行 lock()操作时,由于 tryAcquire()将返回 true,不用进行等待, 等待队列状态不变。若在 线程t1还未 unlock(), 线程t2就进行了 lock()操作,此时 等待队列将被初始化,并将 线程t2插入 等待队列:
此时,若线程t3也进行 lock()操作:
以上则是ReentrantLock的加锁(lock)机制,下面则是ReentrantLock的解锁(unlock)机制:
// ReentrantLock.unlock() public void unlock() { sync.release(1); } // Sync.release() public final boolean release(int arg) { // 尝试解锁,由子类ReentrantLock区定义 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒第一个节点 unparkSuccessor(h); return true; } return false; } // ReentrantLock.tryRelease() protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 仅当state = 0时,才算释放锁成功,即统一线程的lock()次数必须与unlock()次数相同 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // AbstractQueuedSynchronizer.unparkSuccessor() private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 寻找有效的后继节点 Node s = node.next; // 后继节点不存在,或状态为取消时,则查询最前面的一个非取消的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒对应节点锁在线程 LockSupport.unpark(s.thread); } // 一旦 LockSupport.unpark(s.thread);执行完,对应的等待节点将被唤醒: private final boolean parkAndCheckInterrupt() { // 唤醒后返回 LockSupport.park(this); return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { ... try { ... for (;;) { // 设置新的head节点,等待节点得到执行 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } } } finally { ... } } 复制代码
需要注意的是在唤醒节点第一步从头结点进行释放,如果后置节点存在cancel或者null,则在进行相关尾节点向前进行遍历获取最前面且不为cancel或者null的thread任务节点进行唤醒。
下图揭示了从同步器获取锁时,内部的等待队列的状态变化图:
当只有一个 线程t1进行 lock()操作时,由于 tryAcquire()将返回 true,不用进行等待, 等待队列状态不变。 若在 线程t1还未 unlock(), 线程t2就进行了 lock()操作,此时 等待队列将被初始化,并将 线程t2插入 等待队列:
此时,若 线程t3也进行 lock()操作:
以上,则是AbstractQueuedSynchronizer同步器的基本实现机制,其作为很多并发工具的基础,规范了如何阻塞和唤醒线程,相比普通的锁机制(如synchronized),其通过自旋等待和精确唤醒,可以提高一些并发时的性能。