抽象同步队列 AQS 解析
AQS——锁的底层支持
AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件,并发包中的锁底层都是使用 AQS 来实现的,下面看下 AQS 的类图结构。
该图可知,AQS 是一个FIFO
的双向队列,其内部通过节点 head 和 tail 记录队首和队尾的元素,队列元素类型为Node
。
其中 Node 里的 thread 变量用来存放进入 AQS 队列里的线程,而 SHARED 用来标记线程是获取共享资源时被阻塞挂起放入 AQS 队列的;EXCLUSIVE 用来标记该线程是获取独占资源时被挂起放入 AQS 队列中;waitStatus 记录当前线程等待状态,可以为CANCELLED(线程取消)
、SIGNAL(线程需要被唤醒)
、CONDITION(线程在条件队列中等待)
、PROPAGATE(释放共享资源时通知其他节点)
;prev 记录当前节点的前驱节点,next 则是后驱节点。
在 AQS 中维持了一个单一的状态信息state
,可以通过 getState、setState、compareAndSetState 函数修改值。
- 对于 ReentrantLock 的实现,state 可以表示当前线程获取锁的次数;
- 对于读写锁 ReentrantReadWriteLock,state 的高 16 位表示读状态,也就是获取该锁的次数,低 16 位表示获取到写锁线程可重入的次数;
- 对于 Semaphore 来说,state 表示当前可用信号的个数;
- 对于 CountDownlatch 来说,state 用来表示计数器当前的值;
AQS 有个内部类 ConditionObject,它用来结合锁实现线程同步。ConditionObject 可以直接访问 AQS 对象内部的变量,比如 state 状态值和队列。
ConditionObject 是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来存放调用条件变量的 await 方法后被阻塞的线程,而 firstWaiter 表示队首元素,lastWaiter 表示队尾元素。
这里我们先说一下 waitStatus 所表示的几个状态。
- CANCELLED(值为:1):表示当前节点已取消调度。当 timeout 或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的节点将不会再变化。
- SIGNAL(值为:-1):表示后继节点在等待当前节点唤醒。后继节点入队时,会将前继节点的状态更新为 SIGNAL。
- CONDITION(值为:-2):表示节点等待在 Condition 上,当其他线程调用了 Condition 的 signal()方法后,CONDITION 状态的节点将从条件队列转移到同步队列中,等待获取同步锁。
- PROPAGATE(值为:-3):共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
- 值为:0:新节点入队时的默认状态。
对于 AQS 来说,线程同步的关键就是对状态值 state 进行操作,根据 state 是否属于一个线程,操作 state 的方式分为独占和共享。
独占方式下获取资源通过:void acquire(int arg)
、void acquireInterruptibly(int arg)
。
独占方式下释放资源通过:boolean release(int arg)
。
共享方式下获取资源通过:void acquireShared(int arg)
、void acquireSharedInterruptibly(int arg)
。
共享方式下释放资源通过:boolean releaseShared(int arg)
。
在独占方式中获取资源与具体线程绑定的,也就是说如果一个线程获取到资源就会标记是这个线程获取到了,其他线程再通过操作 state 获取资源就会发现该资源不是自己持有的,随后阻塞。
比如独占锁 ReentrantLock 的实现中:当一个线程获取到了 ReentrantLock 锁,在 AQS 内部首先使用 CAS 操作将 state 值从 0 改为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从 1 改为 2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
而共享方式的获取资源是和具体线程不相关的,当多个线程去请求资源时通过 CAS 获取资源,当一个线程获取到资源后,别的线程再去获取时如果当前资源还可以满足需要的话,则只需要通过 CAS 方式获取即可。
比如 Semaphore 信号量,当一个线程通过 acquire 方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则将当前线程放入阻塞队列,满足则通过自旋 CAS 获取信号量。
在独占方式中,获取和释放资源流程如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
当一个线程调用了 acquire 方法获取独占资源时,首先使用 tryAcquire 方法尝试获取资源,具体就是设置状态变量 state 的值,成功即直接返回;失败的话则将当前线程封装为类型为 Node.EXCLUSIVE 的 Node 节点随后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this)挂起自己。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
当一个线程调用 release 方法时会尝试使用 tryRelease 操作释放资源,这里也是设置状态变量 state 的值,随后调用 LockSupport.unpark(thread) 方法激活 AQS 队列中被阻塞的一个线程。被激活的线程则使用 tryAcquire 尝试,看当前变量 state 值是否还能满足自己的需要,满足则继续向下执行,否则还是被放入队列中挂起。
📢 需要注意:AQS 类并没有提供 tryAcquire、tryRelease 方法,需要由具体子类来实现,根据不同场景使用 CAS 算法尝试修改 state 状态值,并且 state 状态值的增减代表什么意义。
比如继承自 AQS 实现的独占锁 ReentrantLock,当 status 为 0 时表示锁空闲,为 1 时表示锁已经被占用。在重写 tryAcquire 时,在内部需要使用 CAS 算法查看当前 state 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前锁的持有者为当前线程,然后返回 true,如果 CAS 失败则返回 false。
在共享方式中,获取和释放资源流程如下:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
当线程调用 acquireShared 获取共享资源时,会首先通过 tryAcquireShared 来尝试获取资源,具体还是设置状态变量 state 的值,成功直接返回,失败则将当前线程封装为类型 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列尾部,并使用 LockSupport.park(this)挂起自己。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
当线程调用 releaseShared 时还是通过尝试 tryReleaseShared 方法来释放资源,也是设置状态变量 state 的值,随后使用 LockSupport.unpark(thread)来激活 AQS 阻塞队列中被阻塞的一个线程,被激活的线程使用 tryReleaseShared 方法查看当前 state 是否还满足自己需要,满足则激活线程继续向下执行,否则还是被放入 AQS 队列中并被挂起。
📢 同样需要注意,AQS 类并没有提供可用的 tryAcquireShared、tryReleaseShared 方法,需要子类去实现。
比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false,否则使用 CAS 递增 state 的高 16 位(在 ReentrantReadWriteLock 中,state 的高 16 位为获取读锁的次数)。
⚠️ 基于 AQS 实现的锁除了需要重写上述介绍的方法外,还需要重写 isHeldExclusively 方法,来判断锁是被当前线程独占还是被共享。
另外我们发现acquireInterruptibly(int arg)
、acquireSharedInterruptibly(int arg)
都带有 Interruptibly 关键字。那么带和不带这个关键字有什么区别?
其实不带 Interruptibly 关键字方法表示不对中断进行响应,也就是线程在调用不带 Interruptibly 的方法获取资源或者获取失败被挂起时,其他线程中断该线程,那么该线程不会因为被中断而抛出异常,继续获取资源或被挂起,也就是不对终端进行响应,忽略中断。
而带 Interruptibly 关键字则是会抛出 InterruptedException 异常并返回。
下面我们看一下 AQS 如何维护队列,主要查看入队操作。
当一个线程获取锁失败后该线程会被转换为 Node 节点,然后使用 enq(final Node node)方法将该节点插入到 AQS 阻塞队列。
private Node enq(final Node node) { for (;;) { Node t = tail;//(1) if (t == null) { // Must initialize if (compareAndSetHead(new Node()))//(2) tail = head; } else { node.prev = t;//(3) if (compareAndSetTail(t, node)) {//(4) t.next = node; return t; } } } }
如上代码,在第一次循环的时候当 AQS 队列状态如图(默认情况)所示,头尾均指向 null;当执行到代码(1)时候,节点 t 指向了尾部节点,队列状态如图步骤(1)所示,这时 t 为 null,执行代码(2)时候使用 CAS 算法设置一个哨兵节点为头节点,如果设置成功则让尾部节点也指向哨兵节点,这时队列状态如图步骤(2)所示。
接下来我们还需要插入 node 节点,所以在第二次循环后又执行到代码(1),队列状态如下图步骤(3)所示;然后执行代码(3)设置 node 的前驱节点为尾部节点,队列状态如下图步骤(4)所示;随后通过 CAS 算法来设置 node 节点为尾部节点,CAS 成功后队列状态如下图步骤(5)所示;随后将原来的尾部节点的后驱节点设置为 node 节点,就完成了双向链表。队列状态如下图步骤(6)所示。
AQS——条件变量的支持
synchronized 和条件变量一样都可以实现线程同步,它们的不同在于 synchronized 同时只能和一个共享变量 notify 或 wait 方法实现同步,而 AQS 的一个锁可以对应多个条件变量。
接下来我们看一下例子。
public static void main(String[] args) { final ReentrantLock lock = new ReentrantLock();// (1) final Condition condition = lock.newCondition();// (2) lock.lock(); // (3) try { System.out.println("begin wait..."); condition.await(); // (4) System.out.println("end wait..."); } catch (Exception e) { lock.unlock(); // (5) } lock.lock(); // (6) try { System.out.println("begin signal..."); condition.signal(); // (7) System.out.println("end signal..."); } catch (Exception e) { lock.unlock(); // (8) } }
这段代码首先创建另一个独占锁 ReentrantLock 对象,也是基于 AQS 实现的。
第二步使用创建的 Lock 对象的 newCondition()方法创建了一个 ConditionObject 变量,这个变量就是 Lock 锁对应的一个条件变量。
📢 一个 Lock 对象可以创建多个条件变量。
第三步获取独占锁,随后第四步调用条件变量的 await()方法阻塞挂起当前线程。当其他线程调用了条件变量的 signal()方法时,被阻塞的线程的才会从 await 处返回,需要注意,和调用 Object 的 wait()方法一样,如果没有获取到锁就调用的话,则会抛出 IllegalMonitorStateException 异常。第五步释放获取的锁。
在上面代码中,lock.newCondition()的作用其实是 new 了一个在 AQS 内部声明的 ConditionObject 对象,ConditionObject 是 AQS 的内部类,可以访问 AQS 内部的变量(例如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列(单向链表队列),用来存放调用条件变量的 await()方法时被阻塞的线程。注意这个条件队列和 AQS 队列不是一回事。
我们看一下 await()方法源码:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 创建新的node节点,并插入到条件队列末尾(1) Node node = addConditionWaiter(); // 释放锁并返回状态位(2) int savedState = fullyRelease(node); int interruptMode = 0; // 调用park方法阻塞挂起当前线程(3) while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //... }
该方法中,当线程调用条件变量的 await()方法时,在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入到条件队列末尾,之后当前线程会释放获取到的锁,也就是操作 state 值,并被阻塞挂起。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
当另外一个线程调用条件变量的 signal 方法时(必须先调用锁的 lock()方法获取锁),在内部会把条件队列里面的一个线程节点从条件队列里面移除并放入 AQS 的阻塞队列里面,然后激活这个线程。
📢 需要注意的是,AQS 只提供了 ConditionObject 的实现,并没有提供 newCondition 函数,需要子类实现。
下面看一下在 await()方法阻塞后,如何放入条件队列的。
private Node addConditionWaiter() { //获取尾部节点 Node t = lastWaiter; // 如果lastWaiter不为空,则检查该队列是否有被Cancel的节点 if (t != null && t.waitStatus != Node.CONDITION) { //遍历条件队列节点,移除已被取消的节点 unlinkCancelledWaiters(); t = lastWaiter; } //利用当前线程构建一个代表当前线程的节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; //没有尾节点就插入到头节点 else t.nextWaiter = node;//尾节点的后驱节点等于当前节点 lastWaiter = node; //尾节点等于当前节点 return node; }
📢 注意:当多个线程调用 lock.lock()方法时,只有一个线程获取到锁,其他线程就会被转换到 Node 节点插入到对应的 AQS 阻塞队列,并自旋 CAS 尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的 await 方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。
当另外一个线程调用条件变量的 signal 或者 signalAll 方法时,会把条件队列里面的一个或者全部 Node 节点移动到 AQS 的阻塞队列里面,等待时机获取锁。
总结:一个锁对应一个 AQS 阻塞队列,对应多个条件变量,每个条件变量都有自己的一个条件队列。
实现自定义独占锁
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2022/1/20 实现自定义独占锁 * @Description */ public class NonReentrantLock implements Lock, Serializable { //自定义实现AQS private static class Sync extends AbstractQueuedSynchronizer { //是否持有锁 @Override protected boolean isHeldExclusively() { return getState() == 1; } //如果state == 0 尝试获取锁 @Override protected boolean tryAcquire(int arg) { assert arg == 1; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //尝试释放锁 设置state == 0 @Override protected boolean tryRelease(int arg) { assert arg == 1; if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } //提供条件变量接口 Condition newCondition() { return new ConditionObject(); } } private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } public boolean isLocked() { return sync.isHeldExclusively(); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
根据自定义锁实现生产者消费者
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2022/1/20 * @Description 生产者消费者模型 */ public class LockTest { final static NonReentrantLock lock = new NonReentrantLock(); final static Condition consumerCondition = lock.newCondition(); final static Condition producerCondition = lock.newCondition(); final static Queue<String> QUEUE = new LinkedBlockingQueue<>(); final static int QUEUE_SIZE = 10; public static void main(String[] args) { LockTest lockTest = new LockTest(); // 启消费者线程 for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 2; j++) { try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者消费了" + lockTest.get()); } }, "consumer_" + i).start(); } // 启动生产者线程 for (int i = 0; i < 2; i++) { new Thread(() -> { for (int j = 0; j < 10; j++) { try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200)); } catch (InterruptedException e) { e.printStackTrace(); } lockTest.put("物品-" + new Random().nextInt(1000)); } }, "产品-" + i).start(); } } private void put(String name) { //获取独占锁 lock.lock(); try { //如果队列满了,则等待 while (QUEUE.size() == QUEUE_SIZE) { producerCondition.await(); } QUEUE.add(name); System.out.println(Thread.currentThread().getName() + "生产了" + name); //唤醒消费线程 consumerCondition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } private String get() { String ret = ""; //获取独占锁 lock.lock(); try { //如果队列空了,则等待 while (QUEUE.size() == 0) { consumerCondition.await(); } //消费一个元素 ret = QUEUE.poll(); //唤醒生产线程 producerCondition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return ret; } }