在Java并发编程的世界里,AbstractQueuedSynchronizer(简称AQS)是当之无愧的核心基石。从ReentrantLock到CountDownLatch,从Semaphore到CyclicBarrier,几乎所有常用的同步工具类都构建在AQS之上。本文将从核心架构入手,逐步拆解AQS的底层实现原理,并结合ReentrantLock、CountDownLatch、Semaphore的源码逻辑,带你彻底掌握这一并发编程的核心密钥。
一、AQS 核心架构
AQS位于java.util.concurrent.locks包,是一个用于构建锁和同步器的抽象框架。其核心设计围绕“同步状态”和“等待队列”展开,主要包含以下三大组件:
1.1 核心组件概览
1.2 同步状态:volatile int state
state是AQS的核心变量,用于表示同步状态:
- 当
state=0时,表示无锁状态; - 当
state>0时,表示有锁状态,数值可表示重入次数; - 由
volatile修饰,保证多线程间的可见性。
1.3 等待队列:CLH虚拟双向队列
CLH队列是AQS实现线程阻塞与唤醒的核心数据结构:
- 是一个虚拟的双向队列(没有实际的队列类,只有节点间的关联);
- 当线程获取锁失败时,会被封装成
Node节点加入队列尾部; - 当锁释放时,会唤醒队列中的后继节点。
1.4 节点类:Node
Node是AQS的内部类,用于封装等待线程及其状态,核心属性如下:
| 属性 | 类型 | 说明 |
thread |
Thread |
等待的线程 |
waitStatus |
int |
等待状态,包含以下取值: - CANCELLED(1):线程取消等待- SIGNAL(-1):后继线程需要被唤醒- CONDITION(-2):线程在Condition队列- PROPAGATE(-3):共享模式下传播- 0:初始状态 |
prev |
Node |
前驱节点 |
next |
Node |
后继节点 |
二、AQS 底层实现原理
AQS支持两种同步模式:独占模式(Exclusive,同一时间只有一个线程能获取锁,如ReentrantLock)和共享模式(Shared,同一时间多个线程能获取锁,如CountDownLatch、Semaphore)。
2.1 独占模式(Exclusive)
2.1.1 获取锁:acquire(int arg)
acquire是独占模式下获取锁的核心方法,其流程如下:
核心步骤解析:
- **
tryAcquire(int arg)**:由子类实现,尝试直接获取锁,成功返回true,失败返回false。 - **
addWaiter(Node mode)**:将当前线程封装成Node,通过CAS操作加入队列尾部,保证线程安全。 - **
acquireQueued(Node node, int arg)**:
- 自旋尝试获取锁(只有前驱节点是头节点时才尝试);
- 若失败则调用
LockSupport.park()阻塞当前线程,直到被前驱节点唤醒或中断; - 返回等待过程中是否被中断。
- **
selfInterrupt()**:若等待过程中被中断,重新设置线程的中断标志位。
2.1.2 释放锁:release(int arg)
release是独占模式下释放锁的核心方法,其流程如下:
核心步骤解析:
- **
tryRelease(int arg)**:由子类实现,尝试释放锁,成功返回true(通常当state减到0时)。 - **
unparkSuccessor(Node node)**:找到队列中的后继节点,调用LockSupport.unpark()唤醒它。
2.2 共享模式(Shared)
2.2.1 获取锁:acquireShared(int arg)
acquireShared是共享模式下获取锁的核心方法,其流程如下:
核心步骤解析:
- **
tryAcquireShared(int arg)**:由子类实现,返回值>=0表示获取成功,<0表示失败。 - **
doAcquireShared(int arg)**:
- 自旋尝试获取锁;
- 若失败则阻塞,直到被唤醒;
- 获取成功后,会传播唤醒后续的共享节点(这是与独占模式的核心区别)。
2.2.2 释放锁:releaseShared(int arg)
releaseShared是共享模式下释放锁的核心方法,其流程如下:
核心步骤解析:
- **
tryReleaseShared(int arg)**:由子类实现,释放成功返回true。 - **
doReleaseShared()**:传播唤醒队列中的后继共享节点,保证多个线程能同时被唤醒。
三、基于 AQS 实现 ReentrantLock
ReentrantLock是可重入的独占锁,其内部通过继承AQS的Sync抽象类实现核心逻辑,并分为FairSync(公平锁)和NonfairSync(非公平锁)两个子类。
3.1 核心逻辑
state含义:表示锁的持有次数,0表示无锁,>=1表示重入次数。tryAcquire实现:
- 非公平锁:先直接CAS尝试获取锁,失败则检查是否当前线程已持有锁,是则
state+1。 - 公平锁:先检查队列中是否有前驱节点(
hasQueuedPredecessors()),没有才尝试获取锁。
tryRelease实现:state-1,当state减到0时,释放锁成功,返回true。
3.2 代码示例
package com.jam.demo.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock 示例
*
* @author ken
*/
@Slf4j
public class ReentrantLockDemo {
private static final ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
increment();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
log.info("Final count: {}", count);
}
/**
* 递增方法,使用 ReentrantLock 保证原子性
*/
private static void increment() {
lock.lock();
try {
count++;
log.info("Thread {} increment count to {}", Thread.currentThread().getName(), count);
} finally {
lock.unlock();
}
}
}
四、基于 AQS 实现 CountDownLatch
CountDownLatch是一个倒计时器,允许一个或多个线程等待其他线程完成操作,基于AQS的共享模式实现。
4.1 核心逻辑
state含义:初始化为需要等待的线程数count。tryAcquireShared实现:判断state是否为0,是则返回1(表示可以通过),否则返回-1(表示需要等待)。tryReleaseShared实现:通过CAS将state减1,当state减到0时,返回true,触发传播唤醒所有等待线程。
4.2 代码示例
package com.jam.demo.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch 示例
*
* @author ken
*/
@Slf4j
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
executeTask(taskId);
} finally {
latch.countDown();
}
}).start();
}
log.info("Main thread waiting for all tasks to complete...");
latch.await();
log.info("All tasks completed! Main thread resumes.");
}
/**
* 执行任务
*
* @param taskId 任务ID
*/
private static void executeTask(int taskId) {
log.info("Task {} started", taskId);
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Task {} interrupted", taskId, e);
}
log.info("Task {} completed", taskId);
}
}
五、基于 AQS 实现 Semaphore
Semaphore是信号量,用于控制同时访问特定资源的线程数量,基于AQS的共享模式实现。
5.1 核心逻辑
state含义:表示可用的许可证数量。tryAcquireShared实现:通过CAS尝试获取许可证,将state减arg,若结果>=0则返回剩余数量,否则返回-1。tryReleaseShared实现:通过CAS释放许可证,将state加arg,返回true。
5.2 代码示例
package com.jam.demo.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
/**
* Semaphore 示例
*
* @author ken
*/
@Slf4j
public class SemaphoreDemo {
private static final int PERMIT_COUNT = 3;
private static final Semaphore semaphore = new Semaphore(PERMIT_COUNT);
public static void main(String[] args) {
int threadCount = 10;
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> accessResource(threadId)).start();
}
}
/**
* 访问资源,使用 Semaphore 限制并发数
*
* @param threadId 线程ID
*/
private static void accessResource(int threadId) {
try {
semaphore.acquire();
log.info("Thread {} acquired permit, accessing resource", threadId);
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread {} interrupted", threadId, e);
} finally {
semaphore.release();
log.info("Thread {} released permit", threadId);
}
}
}
六、总结
AQS通过“volatile state + CLH队列”的核心架构,巧妙地实现了线程的同步与协作。其核心思想是:
- 用
state控制同步状态,保证可见性; - 用CLH队列管理等待线程,保证公平性或高效性;
- 通过子类实现
tryAcquire/tryRelease等方法,灵活定义同步逻辑。
理解AQS的架构与原理,不仅能帮助我们更好地使用ReentrantLock、CountDownLatch等工具类,还能为我们自定义同步器提供坚实的理论基础。