Java 并发编程之AQS
AbstractQueuedSynchronizer (AQS) 是 Java 并发编程中的一个核心框架,广泛用于构建锁和其他同步器(如信号量、读写锁等)。它是 java.util.concurrent.locks 包的一部分。AQS 的设计目的是简化并发同步器的实现。理解 AQS 对于深入理解 Java 并发编程非常重要。
AQS 的基本原理
1. 核心数据结构
AQS 主要依赖一个 FIFO 队列(先进先出队列)来管理获取锁的线程。它的核心数据结构包括:
- State:一个整型变量,表示同步状态。具体含义取决于具体的同步器(如 ReentrantLock、
- Semaphore 等)。
- 队列节点(Node):用来表示请求共享资源的每个线程。
- 队列头(head)和尾(tail):指向等待队列的头和尾节点。
2. 独占模式和共享模式
AQS 支持两种锁模式:
- 独占模式:只有一个线程能占有资源。例如 ReentrantLock。
- 共享模式:多个线程可以共享资源。例如 Semaphore 和 CountDownLatch。
3. 关键方法
AQS 提供了一些需要子类实现的方法(如 tryAcquire、tryRelease 等),子类通过实现这些方法来定义具体的同步逻辑。
AQS 的主要方法
1. 独占模式
- acquire(int arg):尝试获取资源,如果失败则将线程加入等待队列并阻塞。
- release(int arg):释放资源,唤醒等待队列中的下一个节点(如果有)。
2. 共享模式
- acquireShared(int arg):尝试获取共享资源,如果失败则将线程加入等待队列并阻塞。
- releaseShared(int arg):释放共享资源,唤醒等待队列中的下一个节点(如果有)。
3. 队列操作
- addWaiter(Node mode):将当前线程加入等待队列。
- unparkSuccessor(Node node):唤醒等待队列中的下一个节点。
内部类 Node
AQS 内部使用一个 Node 类表示等待队列中的每个节点。Node 类的定义如下
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
具体实现细节
1. 获取独占锁
尝试获取独占锁,如果获取失败,则将当前线程加入等待队列并阻塞:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire 方法的主要流程是:
- 调用 tryAcquire 尝试获取锁(需要子类实现)。
- 如果获取失败,调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 使线程进入等待状态。
tryAcquire(int arg)
尝试获取锁,需要子类实现。例如,ReentrantLock 中的实现如下:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
2. 释放独占锁
release 方法是释放独占锁的入口:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
release 方法的主要流程是:
- 调用 tryRelease 尝试释放锁(需要子类实现)。
- 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。
tryRelease(int arg)
尝试释放锁,需要子类实现。例如,ReentrantLock 中的实现如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
3. 获取共享锁
尝试获取共享锁,如果获取失败,则将当前线程加入等待队列并阻塞:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
acquireShared 方法的主要流程是:
- 调用 tryAcquireShared 尝试获取共享锁(需要子类实现)。
- 如果获取失败,调用 doAcquireShared 使线程进入等待状态。
tryAcquireShared(int arg)
尝试获取共享锁,需要子类实现。例如,Semaphore 中的实现如下:
protected int tryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
4. 释放共享锁
releaseShared 方法是释放共享锁的入口:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
releaseShared 方法的主要流程是:
- 调用 tryReleaseShared 尝试释放共享锁(需要子类实现)。
- 如果释放成功,唤醒等待队列中的下一个节点(如果存在)。
tryReleaseShared(int arg)
尝试释放共享锁,需要子类实现。例如,Semaphore 中的实现如下:
protected boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
AQS 内部实现细节
1. addWaiter(Node mode)
将当前线程封装成节点并加入等待队列:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter 方法尝试快速将节点加入队列的尾部,如果失败则调用 enq 方法进行完整的入队操作。
2. enq(Node node)
将节点加入等待队列:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq 方法通过自旋(无限循环)将节点加入队列,确保线程安全。
2. acquireQueued(final Node node, int arg)
使线程进入等待状态,直到获取到锁:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
cquireQueued 方法的主要流程是:
- 尝试获取锁,如果成功则将节点设置为队列头。
- 如果获取失败,则检查是否应该挂起线程(通过 shouldParkAfterFailedAcquire)。
- 挂起线程(通过 parkAndCheckInterrupt),直到被唤醒。
3. shouldParkAfterFailedAcquire(Node pred, Node node)
检查是否应该挂起线程:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
shouldParkAfterFailedAcquire 方法检查前驱节点的状态,如果前驱节点的状态是 SIGNAL,表示当前线程可以安全地挂起。
4. parkAndCheckInterrupt()
挂起线程并检查中断状态:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt 方法使用 LockSupport.park 挂起线程,并在被唤醒后返回中断状态。
详细示例:ReentrantLock
以下是基于 AQS 实现 ReentrantLock 的示例:
import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class MyReentrantLock { private final Sync sync = new Sync(); private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } @Override protected boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } @Override protected boolean isHeldExclusively() { return getState() != 0 && getExclusiveOwnerThread() == Thread.currentThread(); } } public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); } public boolean isLocked() { return sync.isHeldExclusively(); } }
结论
AQS 是 Java 并发框架中用于实现锁和其他同步器的基础组件。它通过一个 FIFO 队列管理获取锁的线程,并提供模板方法让子类实现具体的同步逻辑。理解 AQS 的工作原理有助于深入理解 Java 并发编程及其高性能实现。