Java并发包(JUC)为我们提供了丰富的并发工具,其中不乏我们熟悉的ReentrantLock、Semaphore等。这些工具背后共同依赖于一个强大的基类——AbstractQueuedSynchronizer(简称AQS)。AQS作为一个构建锁和同步器的框架,能够简洁高效地创建出众多广泛应用的同步器,包括ReentrantLock、Semaphore,以及ReentrantReadWriteLock、SynchronousQueue、FutureTask等。此外,利用AQS,我们还可以轻松地定制符合自身需求的同步器,展现出其出色的灵活性和扩展性。
一、AQS的基本原理
AQS(AbstractQueuedSynchronizer)是Java并发编程中的一个重要组件,它为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器提供了一个基础框架。以下是AQS的详细原理阐述:
1. 基本思想
AQS的核心思想是基于一个volatile int state(表示资源的状态)的变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。它使用一个整数值来表示同步状态,并提供了一系列的方法来操作这个状态。AQS的内部实现依赖于一个FIFO队列来管理等待获取资源的线程,这个队列被称为"同步队列"。
2. 同步队列
同步队列是一个双向链表,遵循FIFO原则。每个节点(Node)都保存了线程的引用、线程状态、前驱节点和后继节点的信息。当线程获取同步状态失败时,AQS会将当前线程和等待状态等信息构造成为一个节点(Node)加入到同步队列,同时会阻塞当前线程。当同步状态释放的时候,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
3. 获取与释放资源
AQS 提供了两种资源获取方式:独占式(如ReentrantLock)和共享式(如Semaphore)。独占式资源在同一时间只能被一个线程获取,而共享式资源则可以被多个线程同时获取。
AQS的主要核心方法是acquire()和release()。其中acquire()方法用于获取锁,如果当前锁状态不可用,则当前线程会进入队列中等待。在一个线程成功获取到锁之后,它会负责维护锁状态并控制队列中等待线程的顺序。release()方法则用于释放锁。当一个线程释放锁之后,它会通知队列中的下一个线程去获取锁。
4. 自定义同步器
AQS作为一个框架,它本身并不直接提供锁或其他同步工具的具体实现,而是通过继承AQS并实现其提供的方法来创建具体的同步器。例如,ReentrantLock、Semaphore、CountDownLatch等都是基于AQS实现的。
AQS通过将复杂的同步问题抽象为对状态的获取和释放,简化了多线程编程的复杂性。同时,AQS的灵活性和可扩展性使得它成为构建各种同步工具的基础。在实际应用中,理解AQS的工作原理和机制对于编写高效、可靠的并发代码至关重要。通过合理地使用基于AQS的同步器,我们可以更好地控制和协调多线程对共享资源的访问,从而提升系统的整体性能和稳定性。
二、AQS的源码及核心方法分析
以下代码段是从Java的AQS类中提取的,可能不是完整的,并且不包含所有方法和内部类。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 同步队列中的节点 static final class Node { // 节点状态;取消、信号、条件、传播等 volatile int waitStatus; // 节点持有的线程 volatile Thread thread; // 节点的前驱节点,比如在CLH队列中的前一个节点 Node prev; // 节点的后继节点 Node next; // ... 其他方法和构造函数 } // 同步状态 private volatile int state; // 同步队列的头节点 private transient volatile Node head; // 同步队列的尾节点 private transient volatile Node tail; // 获取同步状态的方法,由子类实现 protected abstract int tryAcquire(int acquires); // 释放同步状态的方法,由子类实现 protected abstract boolean tryRelease(int releases); // 尝试以独占方式获取资源 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 尝试以共享方式获取资源 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 释放独占资源 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 释放共享资源 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // ... 其他方法和内部类,如ConditionObject等 // 添加节点到队列,省略了部分细节 private Node addWaiter(Node mode) { // ... 省略了代码,这里负责创建节点并加入队列 } // 独占式获取资源的自旋方法 private final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // ... 省略了部分代码,包括检查中断和park/unpark线程的逻辑 } } finally { if (failed) cancelAcquire(node); } } // ... 其他自旋方法,如doAcquireShared等 // 唤醒后继节点 private void unparkSuccessor(Node node) { // ... 省略了代码,这里负责唤醒后继节点 } // ... 其他方法和内部逻辑 }
请注意,上面的代码只是一个摘要,并没有包含AQS类的所有细节。特别是,它省略了条件队列(ConditionObject)的实现、线程中断和取消的逻辑、Unsafe类的使用以及许多其他重要的细节。AQS的实际源码要复杂得多,并且包含大量的优化和错误处理逻辑。如果你想要查看AQS的完整源码,我建议你去查看Java标准库中的java.util.concurrent.locks.AbstractQueuedSynchronizer类的源码。这将给你一个全面而深入的了解,包括它是如何实现锁和其他同步原语的。
对AQS源码的简要分析:
1. 内部类与属性
AQS内部定义了一个继承自AbstractOwnableSynchronizer的静态内部类Sync,这个类通常被具体的同步器(如ReentrantLock)继承。此外,AQS还定义了一个双向链表节点类Node,用于构造同步队列。
AQS有几个关键的属性:
volatile int state:表示同步状态,这是一个volatile变量,保证了其修改对多线程的可见性。
Node head、Node tail:分别指向同步队列的头节点和尾节点。
2. 获取资源
AQS提供了两种资源获取方式:独占式和共享式。
独占式获取资源的方法主要有acquire和tryAcquire。acquire是一个模板方法,它首先尝试调用tryAcquire方法获取资源,如果失败则会将当前线程加入同步队列并阻塞。tryAcquire方法需要由具体的同步器实现。
共享式获取资源的方法主要有acquireShared和tryAcquireShared。acquireShared同样是一个模板方法,它调用tryAcquireShared方法尝试获取资源,并根据返回值决定是否需要将当前线程加入同步队列。
3. 释放资源
AQS同样提供了独占式和共享式的资源释放方法。
独占式释放资源的方法主要有release和tryRelease。release方法会调用tryRelease来尝试释放资源,如果成功则会唤醒同步队列中的头节点线程。tryRelease方法需要由具体的同步器实现。
共享式释放资源的方法主要有releaseShared和tryReleaseShared。releaseShared方法会调用tryReleaseShared来尝试释放资源,并根据返回值决定是否唤醒后续节点线程。
4. 同步队列
AQS内部维护了一个基于Node节点的FIFO同步队列。当一个线程尝试获取资源失败时,它会被封装成一个Node节点并加入到同步队列的尾部。当资源被释放时,队列中的头节点线程会被唤醒并尝试重新获取资源。
5. 条件队列
AQS还提供了条件队列(Condition Queue)的支持,允许线程在特定条件下等待。每个Condition对象都关联一个条件队列,用于存放等待特定条件的线程。await和signal等方法用于操作条件队列。
AQS的源码实现了一个灵活且可扩展的同步框架,通过内部状态、同步队列和条件队列等机制,支持多种同步场景。开发者可以通过继承AQS并实现相关方法来构建自定义的同步器,满足特定的并发需求。AQS的设计体现了Java并发编程的精髓,是学习和理解Java并发编程的重要部分。
AQS的几个核心方法:QS(AbstractQueuedSynchronizer)的核心方法主要涉及资源的获取和释放,这些方法是实现各种同步器的基础。下面详细解释AQS的几个核心方法:
acquire(int arg):
此方法是独占式获取同步状态的方法,也就是说,在同一时刻仅有一个线程能够获取到同步状态。
它首先尝试调用自定义同步器实现的tryAcquire方法来获取资源,如果成功则立即返回。如果失败,则会将当前线程加入到CLH同步队列的尾部,并使其进入等待状态。
该方法对中断不敏感,即由于线程获取同步状态失败而加入到CLH同步队列中后,后续对线程进行中断操作时,线程不会从同步队列中移除。
release(int arg):
此方法是独占式释放同步状态的方法。
它会调用自定义同步器实现的tryRelease方法来尝试释放资源,如果成功,则会唤醒等待队列中的头节点线程,使其有机会重新尝试获取同步状态。
该方法通常用于当资源已经被占用后的释放操作,确保资源能够被其他等待线程公平地获取。
acquireShared(int arg):
此方法是共享式获取同步状态的方法,允许多个线程同时获取资源。
与acquire方法类似,它首先尝试调用自定义同步器实现的tryAcquireShared方法来获取资源。如果返回值小于0,表示获取失败;如果等于0,表示获取成功但没有剩余资源;如果大于0,表示获取成功且有剩余资源。
如果获取失败,当前线程也会被加入到等待队列中,但不同的是,由于是共享模式,即使前一个线程获取资源后仍有剩余,后续线程也可能继续获取。
releaseShared(int arg):
此方法是共享式释放同步状态的方法。
它会调用自定义同步器实现的tryReleaseShared方法来尝试释放资源,并返回一个表示释放后剩余资源的值。
如果释放后仍有剩余资源,那么等待队列中的其他线程将有机会继续获取资源。
这些方法通常需要在自定义同步器中进行实现,以满足特定的同步需求。AQS通过这些方法提供了一种灵活且可扩展的同步机制,使得开发者能够轻松地构建出符合自己需求的同步器。
三、AQS的应用
AQS作为一个框架,它本身并不直接提供锁或其他同步工具的具体实现,而是通过继承AQS并实现其提供的方法来创建具体的同步器。基于AQS,Java并发包提供了多种同步工具,包括
ReentrantLock、Semaphore、CountDownLatch和CyclicBarrier等。以下是这些工具使用AQS的原理:
ReentrantLock(可重入锁)
ReentrantLock是一个互斥锁,它利用AQS来实现锁的获取和释放。在AQS中,state字段表示锁是否被任何线程持有,以及被持有的次数(对于可重入锁)。当一个线程首次获取锁时,AQS会将state设置为占用状态,并记录当前线程为锁的持有者。如果同一个线程再次获取锁,state会递增,表示重入次数增加。当线程释放锁时,state递减,直到为0时表示锁完全释放,其他线程可以获取锁。
Semaphore(信号量)
Semaphore用于控制对一组资源的访问,它内部使用AQS来维护一个许可的计数器。在AQS中,state字段表示当前可用的许可数量。线程通过调用acquire方法来获取一个许可,如果许可可用(state > 0),则获取成功并将state递减;如果许可不可用,则线程会进入等待队列。线程释放许可时调用release方法,将state递增,并可能会唤醒等待队列中的线程。
CountDownLatch(倒计时门闩)
CountDownLatch允许一个或多个线程等待其他线程完成一组操作。在AQS中,state字段表示还需要等待的事件数量(即计数器的初始值)。每当一个事件完成时,计数器递减(通过countDown方法),直到计数器达到0。等待的线程通过调用await方法来阻塞自己,直到计数器为0时才会被唤醒并继续执行。
CyclicBarrier(循环屏障)
CyclicBarrier用于让一组线程在继续执行之前互相等待,直到所有线程都达到某个屏障点。AQS在CyclicBarrier中用于维护已经到达屏障的线程数量和一个表示屏障是否已经被打破的状态。在AQS中,state字段的含义与CountDownLatch类似,但CyclicBarrier可以重复使用,即一组线程可以多次在所有线程都达到屏障点后继续执行。当最后一个线程到达屏障点时,屏障会打开,所有等待的线程会被唤醒,同时可以选择执行一个屏障动作。
FutureTask
FutureTask是一个可取消的异步计算任务,它实现了Future接口,表示异步计算的结果。FutureTask可以处于三种状态:未开始、已完成和已取消。AQS在FutureTask中用于管理这些状态以及处理线程的阻塞和唤醒。
在FutureTask中,AQS的state字段表示任务的状态。当任务还未开始时,state通常会被设置为一个初始值。当任务开始执行时,执行任务的线程会尝试通过AQS的获取(acquire)方法来获取任务执行的权利,这个方法会根据state字段的值来决定是立即返回、阻塞当前线程还是抛出异常。
如果任务已经完成或被取消,AQS的获取方法会根据情况立即返回或抛出异常。如果任务还未完成,执行任务的线程会开始执行任务,并在任务完成后通过AQS的释放(release)方法来更新state字段的值,并唤醒其他可能正在等待任务结果的线程。
其他线程可以通过调用FutureTask的get方法来获取任务的结果。如果任务还未完成,调用get方法的线程会被阻塞,直到任务完成并被其他线程唤醒。这种阻塞和唤醒的机制是通过AQS的等待队列来实现的。
SynchronousQueue
SynchronousQueue是一个没有存储空间的阻塞队列,它每个插入操作必须等待一个相应的删除操作,反之亦然。AQS在SynchronousQueue中用于实现这种阻塞的插入和删除操作。
在SynchronousQueue中,当一个线程尝试向队列中插入一个元素时,它会调用AQS的获取方法,这个方法会根据队列的当前状态来决定是立即返回、阻塞当前线程还是抛出异常。如果队列为空,插入操作会阻塞当前线程,直到有另一个线程从队列中删除一个元素。
类似地,当一个线程尝试从队列中删除一个元素时,它会调用AQS的获取方法。如果队列非空(即有一个元素等待被删除),删除操作会立即返回该元素。否则,删除操作会阻塞当前线程,直到有另一个线程向队列中插入一个元素。
在SynchronousQueue中,AQS的state字段可能不直接用于表示队列中元素的数量,因为队列本身没有存储空间。相反,state字段可能用于表示队列的阻塞状态或其他同步信息。具体的实现细节可能会因不同的SynchronousQueue实现而有所不同。
在所有这些工具中,AQS提供了核心的同步机制,包括状态的原子性操作、线程的阻塞与唤醒以及等待队列的管理。这些工具通过扩展AQS并覆盖其部分方法来实现各自特定的同步语义。
四、自定义同步器
上面大概讲了一些关于AQS如何使用的理论性的东西,接下来,我们就来看下实际如何使用,通过继承AQS,并重写其提供的核心方法,我们可以创建自定义的同步器来满足特定的需求。
以下是一个简单的示例,展示了如何实现一个基于AQS的互斥锁(Mutex)。
import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class Mutex { // 自定义同步器,继承AQS private static class Sync extends AbstractQueuedSynchronizer { // 是否处于占用状态 protected boolean isHeldExclusively() { return getState() == 1; } // 当状态为0的时候才设置状态为1,不可重入! public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { // state为0才设置为1,不可重入! setExclusiveOwnerThread(Thread.currentThread()); // 设置为当前线程独占资源 return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); // state置0 return true; } // 提供一个条件变量 Condition newCondition() { return new ConditionObject(); } // 唤醒等待在锁上的线程 final boolean hasQueuedThreads() { return head != tail; } // 唤醒等待在锁上的线程 final void unlock() { release(1); } // 锁是否被占用 final boolean isLocked() { return getState() == 1; } } // 同步器对象 private final Sync sync = new Sync(); // 锁是否被占用 public boolean isLocked() { return sync.isLocked(); } // 尝试获取锁 public boolean tryLock() { return sync.tryAcquire(1); } // 获取锁 public void lock() { sync.acquire(1); } // 尝试释放锁 public boolean tryUnlock() { return sync.tryRelease(1); } // 释放锁 public void unlock() { sync.unlock(); } // 唤醒等待在锁上的线程 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // 提供一个条件变量 public Condition newCondition() { return sync.newCondition(); } }
在这个例子中,我们创建了一个名为Mutex的类,它内部定义了一个静态内部类Sync,该类继承自AbstractQueuedSynchronizer。我们重写了以下几个关键方法:
tryAcquire:尝试获取锁,如果当前状态为0(表示锁未被占用),则将其设置为1,并将当前线程设置为独占资源的线程。
tryRelease:尝试释放锁,将状态设置为0,并清除独占资源的线程。
isLocked:检查锁是否被占用,通过检查状态是否为1来判断。
Mutex类本身提供了更高级别的lock、tryLock、unlock和tryUnlock等方法,这些方法内部调用Sync对象上相应的方法来实现互斥锁的功能。此外,我们还提供了isLocked和hasQueuedThreads方法来检查锁的状态和是否有线程在等待获取锁。
请注意,这个示例是一个简单的互斥锁实现,它不支持重入锁(即一个线程在已经持有锁的情况下再次获取锁)。在实际应用中,你可能需要根据你的需求来调整tryAcquire和tryRelease方法的实现。
此外,上面的示例中newCondition方法直接返回了AQS内部的ConditionObject实例,这使得Mutex的使用者可以创建条件变量,并与锁一起使用。这是AQS提供的一个强大功能,允许你在等待特定条件时挂起和唤醒线程。
下面是一个测试应用的例子:
public class MutexTest { private static final int NUM_THREADS = 5; private static Mutex mutex = new Mutex(); private static int counter = 0; public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { threads[i] = new Thread(() -> { for (int j = 0; j < 1000; j++) { mutex.lock(); try { counter++; System.out.println(Thread.currentThread().getName() + ": Counter = " + counter); } finally { mutex.unlock(); } } }); } // Start all threads for (Thread thread : threads) { thread.start(); } // Wait for all threads to finish for (Thread thread : threads) { thread.join(); } System.out.println("Final Counter Value: " + counter); } }
在这个测试应用中,我们创建了一个名为MutexTest的类,它有一个静态的Mutex实例和一个共享的counter变量。我们创建了5个线程,每个线程都会尝试获取mutex锁,然后递增counter变量,并打印当前线程的名称和计数器的值。为了确保计数器操作的原子性,我们在mutex.lock()和mutex.unlock()之间执行这些操作。
注意,在finally块中释放锁是很重要的,以确保即使在发生异常的情况下锁也能被正确释放。
运行这个程序,你应该会看到每个线程按顺序(或几乎按顺序,取决于线程调度)打印出计数器的值,并且最终的计数器值应该是5000(因为每个线程递增计数器1000次)。
这个程序展示了如何使用自定义的Mutex类来同步多个线程对共享资源的访问,从而避免竞态条件和数据不一致的问题。
五、总结
AQS作为Java并发编程中的一个重要组件,提供了强大的同步机制。它通过将复杂的同步问题抽象为对状态的获取和释放,简化了多线程编程的复杂性。同时,AQS的灵活性和可扩展性使得它成为构建各种同步工具的基础。
在实际应用中,理解AQS的工作原理和机制对于编写高效、可靠的并发代码至关重要。通过合理地使用基于AQS的同步器,我们可以更好地控制和协调多线程对共享资源的访问,从而提升系统的整体性能和稳定性。