共享模式
获取共享资源 acquireShared
public final void acquireShared(int arg) { // 小于 0 表示获取资源失败 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { // 添加到节点 此处是共享节点 final Node node = addWaiter(Node.SHARED); // 根据是否拿到资源 判断是否需要取消 boolean failed = true; try { boolean interrupted = false; for (;;) { // 返回前一个节点 final Node p = node.predecessor(); if (p == head) { // 再次尝试获取共享资源 int r = tryAcquireShared(arg); // 表示获取成功 if (r >= 0) { // 设置当前节点为头节点 并尝试唤醒后续节点 setHeadAndPropagate(node, r); // 释放头节点 GC 会回收 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- tryAcquireShared(arg),尝试获取资源,这块由子类实现;
- 返回值分为 3 种:
- 小于 0: 表示失败;
- 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
- 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。
- 在失败后会使用
doAcquireShared(arg);
不断获取资源; final Node node = addWaiter(Node.SHARED);
同样会创建节点;- 在循环中不断判断前一个节点如果是 head,则尝试获取资源;
- 在共享模式下获取到资源后会使用
setHeadAndPropagate(node, r);
设置头节点,同时唤醒后续节点。
设置头节点,并传播唤醒后续节点
// node 是当前节点 // propagate 是 前一步 tryAcquireShared 的返回值 进来时 >=0 // 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。 private void setHeadAndPropagate(Node node, int propagate) { // 记录下当前头节点 Node h = head; // Record old head for check below // 设置传入 node 为头节点 setHead(node); // 判断条件,唤醒后续节点 // propagate > 0 有后续资源 // h == null 旧的头节点 因为前面 addWaiter, 肯定不会为空,应该是防止 h.waitStatus < 0 空指针的写法 // (h = head) == null 当前的 头节点,再判断状态 // waitStatus < 0 后续节点就需要被唤醒 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 后续节点为共享,则需要唤醒 if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared() 释放共享资源
private void doReleaseShared() { // 循环 for (;;) { // 从头开始 Node h = head; // 判断队列是否为空,就是刚初始化 if (h != null && h != tail) { int ws = h.waitStatus; // SIGNAL( -1 后续线程需要释放) if (ws == Node.SIGNAL) { // 将等待状态更新为 0 如果失败,会循环 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后续节点, 同时将当前节点设置为 取消 unparkSuccessor(h); } // 如果状态是 0 则会更新状态为 PROPAGATE // PROPAGATE ( -3 releaseShared 应该被传播到其他节点) else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 判断头节点有没有变化,有变化 是因为竞争,别的线程获取到了锁,会继续循环 // 没有变化直接结束 if (h == head) // loop if head changed break; } }
- 从头节点开始进行,如果 h != null && h != tail 说明队列不是空或者刚初始化;
- 节点状态为 SIGNAL( -1 )说明后续线程需要释放;
- 会更改当前节点状态,成功后唤醒后续节点,失败则继续循环;
- 节点状态如果是 0 则更新为 PROPAGATE,会将状态传播。
释放共享资源 releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 释放共享资源 doReleaseShared(); return true; } return false; }
以共享模式释放。 通过释放一个或多个线程,如果实现tryReleaseShared返回true。
总结
Q: AQS 到底是什么?
A: AQS 内部提供了一个先入先出(FIFO)双向等待队列,内部依靠 Node 实现,并提供了在独占模式
和共享模式
下的出入队列的公共方法。而关于状态信息 state 的定义是由子类实现。tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared等尝试获取资源操作都是由子类进行定义和实现的。而 AQS 中提供了子类获取资源之后的相关操作,包括节点 Node 的出入队列,自旋获取资源等等。
Q: AQS 获取资源失败后会如何操作?
A: 线程获取资源失败后,会放到等待队列中,在队列中会不断尝试获取资源(自旋),说明线程只是进入等待状态,后面还是可以再次获取资源的。
Q: AQS 等待队列的数据结构是什么?
A: CLH变体的先入先出(FIFO)双向等待队列。(CLH锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)
Q: AQS 等待队列中的节点如何获取获取和释放资源的?
A: 可以看下独占模式
中的讲述过程,通过代码梳理。
本文分别从 独占模式
和 共享模式
介绍的 AQS 基本逻辑,并通过源码和作图理解基本思路。但是并没有对需要子类实现的业务逻辑做介绍。这块会在后面介绍 ReentrantLock
、CountDownLatch
等子类的时候做介绍。