AQS
AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。
源码分析
private transient volatile Node head; private transient volatile Node tail;
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
AQS的核心数据结构是一个双向链表(变量 head 和 tail),称为等待队列 。我们来看一下AQS 内部定义的 Node 节点里面都有哪些东西
常量 SHARED 和 EXCLUSIVE 是区分等待队列中的两种类型:独占节点(exclusive node)和共享节点(shared node)。独占节点用于独占式同步,共享节点用于共享式同步(例如Semaphore、CountDownLatch等)。接下来是几个状态量来表明当前 node 的状态:
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus;
这几个量都是 waitStatus 可能的值
- SIGNAL:当前节点的后继节点被阻塞(通过 park),因此当前节点在释放或取消时必须唤醒它的后继节点。为避免竞争,获取操作必须首先指示它们需要一个信号,然后重试原子获取操作,在失败时进行阻塞。
- CANCELLED:该节点由于超时或中断而被取消。节点一旦处于该状态,将永远不会再次阻塞。特别地,拥有被取消节点的线程将不再阻塞。
- CONDITION:该节点当前位于条件队列中。在被传输之前,它不会被用作同步队列节点。一旦被传输,该节点的状态将被设置为 0。(在这里使用该值与字段的其他用途无关,但简化了机制。)
- PROPAGATE:应该将 releaseShared 操作传播给其他节点。在 doReleaseShared 方法中,对于头节点,设置此值以确保传播继续,即使此后有其他操作介入。
- 0:上述情况均不满足。对于非负数值,表示节点无需发出信号。因此,大多数代码无需检查特定的值,只需检查 SIGNAL 即可。该字段对于普通同步节点初始化为 0,对于条件节点初始化为 CONDITION。可以使用 CAS(或在可能时,无条件的 volatile 写入)来修改该字段的值。
对于独占节点(exclusive node)和共享节点(shared node)的解释:
独占节点(exclusive node)和共享节点(shared node)。这两种节点分别用于实现独占式同步和共享式同步。在多线程环境下,AQS利用这些节点实现对锁和资源的安全管理和控制。
独占节点(exclusive node):
独占节点用于独占式同步,如ReentrantLock等可重入锁。在独占模式下,同一时刻只允许一个线程获取锁,其他线程需要等待,直到获取锁的线程释放锁。独占节点继承自AQS的内部类Node。
共享节点(shared node):
共享节点用于共享式同步,如CountDownLatch、Semaphore等。在共享模式下,允许多个线程同时获取资源,一般用于计数器等场景。共享节点同样继承自AQS的内部类Node。
acquire()
和release()
是 AQS 中的两个核心方法,用于实现同步器的获取和释放操作。它们是用来实现独占式同步的关键方法。1. acquire() 方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
arg
:表示获取同步状态的参数,具体含义由具体的同步器实现类决定。
acquire()
方法分为两个步骤:tryAcquire()
和acquireQueued()
。
tryAcquire()
:尝试以独占模式获取同步状态。该方法由具体的同步器实现类重写,用于尝试获取同步状态。如果成功获取了同步状态(即返回 true),则该方法直接返回。如果没有成功获取同步状态(即返回 false),则继续执行后续步骤。acquireQueued()
:将当前线程加入等待队列,并以独占模式获取同步状态。该方法会在等待队列中排队,并尝试获取同步状态,直到成功获取为止。它是一个自旋操作,即不断尝试获取锁直到成功。在等待队列中排队的过程中,如果线程被中断,则会退出自旋,并清除中断状态。addWaiter(Node mode)
:将当前线程以指定模式(独占模式或共享模式)加入等待队列。该方法会创建一个新的节点,并将其插入到等待队列的尾部。selfInterrupt()
:如果当前线程被中断,则自我中断。在等待队列中自旋的过程中,如果发现当前线程被中断,则会调用该方法中断自己,以响应外部的中断请求。2. release() 方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } return true; } return false; }
arg
:表示释放同步状态的参数,具体含义由具体的同步器实现类决定。
release()
方法分为两个步骤:tryRelease()
和unparkSuccessor()
。
tryRelease()
:尝试以独占模式释放同步状态。该方法由具体的同步器实现类重写,用于尝试释放同步状态。如果成功释放了同步状态(即返回 true),则继续执行后续步骤。如果没有成功释放同步状态(即返回 false),则不进行后续操作。unparkSuccessor(Node node)
:唤醒后继节点。当线程释放锁时,它会调用unparkSuccessor()
方法来唤醒等待队列中的后继节点,使其有机会继续尝试获取同步状态。该方法会将后继节点的等待状态设为 0(Node.SIGNAL)并尝试唤醒后继节点。
release()
方法通常在释放锁或资源时调用,它会首先尝试释放同步状态,如果成功释放,则唤醒等待队列中的后继节点,使其有机会获取同步状态。这些方法的实现会依赖具体的同步器类型,因为不同的同步器有不同的获取和释放逻辑。在使用 AQS 时,开发者需要根据具体的业务需求来实现这些方法,以实现线程的同步和协作。
自己手写一个共享锁
基于 AQS 实现一个共享式的同步锁,我们创建一个新的类 SharedLock。SharedLock 继承自 AQS 并实现tryAcquireShared()、tryReleaseShared() 方法
import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class SharedLock extends AbstractQueuedSynchronizer { // 构造方法,指定共享资源数量 public SharedLock(int resources) { setState(resources); } // 获取共享资源 public void acquireShared() { acquireShared(1); } // 释放共享资源 public void releaseShared() { releaseShared(1); } @Override protected int tryAcquireShared(int arg) { // 获取当前同步状态 int currentState = getState(); // 如果当前资源数量为0,或者请求资源数量大于当前资源数量,则返回负值,表示获取失败 if (currentState == 0 || arg > currentState) { return -1; } // 尝试更新同步状态,获取资源 int newState = currentState - arg; if (compareAndSetState(currentState, newState)) { return newState; } return -1; // 获取资源失败 } @Override protected boolean tryReleaseShared(int arg) { // 释放共享资源,直接增加同步状态 for (;;) { int currentState = getState(); int newState = currentState + arg; if (compareAndSetState(currentState, newState)) { return true; } } } }
- 构造方法
SharedLock(int resources)
:创建一个共享锁,并指定共享资源的数量。acquireShared()
方法:获取共享资源。如果当前没有可用的资源,则线程会进入等待状态。releaseShared()
方法:释放共享资源。tryAcquireShared(int arg)
方法:尝试获取共享资源。arg 表示请求的资源数量。如果当前可用的资源数量不足以满足请求,则返回负值表示获取失败,否则更新同步状态并返回新的资源数量。tryReleaseShared(int arg)
方法:尝试释放共享资源。arg 表示释放的资源数量。直接增加同步状态,表示释放资源。下面使用一下我们自定义的共享锁
我们创建了一个包含 5 个共享资源的 SharedLock 对象,并创建了 10 个线程来获取和释放共享资源。每个线程会尝试获取一个共享资源,然后执行一段休眠时间后再释放该资源。
public class Main { public static void main(String[] args) { int totalResources = 5; SharedLock sharedLock = new SharedLock(totalResources); Runnable runnable = () -> { sharedLock.acquireShared(); System.out.println(Thread.currentThread().getName() + " acquired the shared resource."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } sharedLock.releaseShared(); System.out.println(Thread.currentThread().getName() + " released the shared resource."); }; for (int i = 0; i < 10; i++) { Thread thread = new Thread(runnable); thread.start(); } } }
注意: 共享锁允许多个线程同时获取资源,只要可用资源数量足够。当可用资源数量不足时,线程将进入等待状态,直到有足够的资源为止
总结
AQS的底层原理:
- 基于CAS(Compare and Swap): AQS主要利用了CAS操作(即compareAndSet()方法)来实现对共享变量state的原子更新。CAS是一种乐观锁机制,通过比较当前值与期望值是否相等,若相等则执行更新操作,否则重新尝试。
- 等待队列: 等待队列是AQS实现线程同步的关键数据结构,它采用双向链表来存储等待线程节点。当线程需要获取锁时,如果锁被其他线程持有,则该线程会进入等待队列中,并挂起。当锁释放时,AQS会从等待队列中唤醒某个线程,使其重新尝试获取锁。
- 独占模式和共享模式: AQS支持两种同步模式:独占模式和共享模式。独占模式适用于只允许一个线程访问资源的场景,如ReentrantLock。共享模式适用于允许多个线程同时访问资源的场景,如Semaphore和CountDownLatch。