非公平锁实现原理+源码解读

简介: 非公平锁实现原理+源码解读

非公平锁实现原理

加锁解锁流程

先从构造器开始看,默认为非公平锁实现

1. public ReentrantLock() {
2.     sync = new NonfairSync();
3. }

NonfairSync 继承自 AQS

没有竞争时

第一个竞争出现时

Thread-1 执行了

1. CAS 尝试将 state 由 0 改为 1,结果失败

2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败

3. 接下来进入 addWaiter 逻辑,构造 Node 队列

  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquireQueued 逻辑

  • 1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  • 2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  • 3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

  • 4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  • 5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true 6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

再次有多个线程经历上述过程竞争失败,变成这个样子

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

回到 Thread-1 的 acquireQueued 流程

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先

Thread-4 被设置为 exclusiveOwnerThread,state = 1

Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁源码

1. // Sync 继承自 AQS
2. 
3. static final class NonfairSync extends Sync {
4. private static final long serialVersionUID = 7316153563782823691L;
5. 
6. // 加锁实现
7. final void lock() {
8. // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
9. 
10. if (compareAndSetState(0, 1))
11.                 setExclusiveOwnerThread(Thread.currentThread());
12. else
13. // 如果尝试失败,进入 ㈠
14.                 acquire(1);
15.         }
16. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
17. public final void acquire(int arg) {
18. // ㈡ tryAcquire
19. if (!tryAcquire(arg) &&
20. // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
21.                             acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
22.             ) {
23.                 selfInterrupt();
24.             }
25.         }
26. 
27. // ㈡ 进入 ㈢
28. protected final boolean tryAcquire(int acquires) {
29. return nonfairTryAcquire(acquires);
30.         }
31. 
32. // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
33. final boolean nonfairTryAcquire(int acquires) {
34. final Thread current = Thread.currentThread();
35. int c = getState();
36. // 如果还没有获得锁
37. 
38. if (c == 0) {
39. // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
40. 
41. if (compareAndSetState(0, acquires)) {
42.                     setExclusiveOwnerThread(current);
43. return true;
44.                 }
45.             }
46. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
47. 
48. else if (current == getExclusiveOwnerThread()) {
49. // state++
50. 
51. int nextc = c + acquires;
52. if (nextc < 0) // overflow
53. 
54. throw new Error("Maximum lock count exceeded");
55.                 setState(nextc);
56. return true;
57.             }
58. // 获取失败, 回到调用处
59. 
60. return false;
61.         }
62. 
63. // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
64. 
65. private Node addWaiter(Node mode) {
66. // 将当前线程关联到一个 Node 对象上, 模式为独占模式
67. 
68. Node node = new Node(Thread.currentThread(), mode);
69. // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
70. 
71. Node pred = tail;
72. if (pred != null) {
73.                 node.prev = pred;
74. if (compareAndSetTail(pred, node)) {
75. // 双向链表
76. 
77.                     pred.next = node;
78. return node;
79.                 }
80.             }
81. // 尝试将 Node 加入 AQS, 进入 ㈥
82. 
83.             enq(node);
84. return node;
85.         }
86. 
87. // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
88. 
89. private Node enq(final Node node) {
90. for (; ; ) {
91. Node t = tail;
92. if (t == null) {
93. // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
94. 
95. if (compareAndSetHead(new Node())) {
96.                         tail = head;
97.                     }
98.                 } else {
99. // cas 尝试将 Node 对象加入 AQS 队列尾部
100. 
101.                     node.prev = t;
102. if (compareAndSetTail(t, node)) {
103.                         t.next = node;
104. return t;
105.                     }
106.                 }
107.             }
108.         }
109. 
110. // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
111. 
112. final boolean acquireQueued(final Node node, int arg) {
113. boolean failed = true;
114. try {
115. boolean interrupted = false;
116. for (; ; ) {
117. final Node p = node.predecessor();
118. // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
119. 
120. if (p == head && tryAcquire(arg)) {
121. // 获取成功, 设置自己(当前线程对应的 node)为 head
122. 
123.                         setHead(node);
124. // 上一个节点 help GC
125. 
126.                         p.next = null;
127.                         failed = false;
128. // 返回中断标记 false
129. 
130. return interrupted;
131.                     }
132. if (
133. // 判断是否应当 park, 进入 ㈦
134. 
135.                             shouldParkAfterFailedAcquire(p, node) &&
136. 
137. // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
138. 
139.                                     parkAndCheckInterrupt()
140.                     ) {
141.                         interrupted = true;
142.                     }
143.                 }
144.             } finally {
145. if (failed)
146.                     cancelAcquire(node);
147.             }
148.         }
149. 
150. // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
151. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
152. // 获取上一个节点的状态
153. int ws = pred.waitStatus;
154. if (ws == Node.SIGNAL) {
155. // 上一个节点都在阻塞, 那么自己也阻塞好了
156. return true;
157.             }
158. // > 0 表示取消状态
159. if (ws > 0) {
160. // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
161. do {
162.                     node.prev = pred = pred.prev;
163.                 } while (pred.waitStatus > 0);
164.                 pred.next = node;
165.             } else {
166. // 这次还没有阻塞
167. // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
168.                 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
169.             }
170. return false;
171.         }
172. 
173. // ㈧ 阻塞当前线程
174. private final boolean parkAndCheckInterrupt() {
175.             LockSupport.park(this);
176. return Thread.interrupted();
177.         }
178.     }

注意 是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定

解锁源码

1. // Sync 继承自 AQS
2. 
3. static final class NonfairSync extends Sync {
4. // 解锁实现
5. public void unlock() {
6.             sync.release(1);
7.         }
8. 
9. // AQS 继承过来的方法, 方便阅读, 放在此处
10. public final boolean release(int arg) {
11. // 尝试释放锁, 进入 ㈠
12. if (tryRelease(arg)) {
13. // 队列头节点 unpark
14. Node h = head;
15. if (
16. // 队列不为 null
17.                         h != null &&
18. // waitStatus == Node.SIGNAL 才需要 unpark
19.                                 h.waitStatus != 0
20.                 ) {
21. // unpark AQS 中等待的线程, 进入 ㈡
22. 
23.                     unparkSuccessor(h);
24.                 }
25. return true;
26.             }
27. return false;
28.         }
29. 
30. // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
31. 
32. protected final boolean tryRelease(int releases) {
33. // state--
34. int c = getState() - releases;
35. if (Thread.currentThread() != getExclusiveOwnerThread())
36. throw new IllegalMonitorStateException();
37. boolean free = false;
38. // 支持锁重入, 只有 state 减为 0, 才释放成功
39. 
40. if (c == 0) {
41.                 free = true;
42.                 setExclusiveOwnerThread(null);
43.             }
44.             setState(c);
45. return free;
46.         }
47. 
48. // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
49. 
50. private void unparkSuccessor(Node node) {
51. // 如果状态为 Node.SIGNAL 尝试重置状态为 0
52. // 不成功也可以
53. int ws = node.waitStatus;
54. if (ws < 0) {
55.                 compareAndSetWaitStatus(node, ws, 0);
56.             }
57. // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
58. Node s = node.next;
59. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
60. if (s == null || s.waitStatus > 0) {
61.                 s = null;
62. for (Node t = tail; t != null && t != node; t = t.prev)
63. if (t.waitStatus <= 0)
64.                         s = t;
65.             }
66. if (s != null)
67.                 LockSupport.unpark(s.thread);
68.         }
69.     }



相关文章
|
6月前
|
存储 监控 安全
吃透synchronized实现原理
吃透synchronized实现原理
85 0
|
6月前
|
安全 Java
Java并发编程:Synchronized及其实现原理
Java并发编程:Synchronized及其实现原理
53 4
图解ReentrantLock底层公平锁和非公平锁实现原理
图解ReentrantLock底层公平锁和非公平锁实现原理
181 0
AQS源码解读之一
AQS源码解读之一
44 0
|
存储 SQL 监控
Java线程池实现原理详解
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因
115 0
Java线程池实现原理详解
ReentrantLock源码分析笔记-单线程公平锁为例
ReentrantLock源码分析笔记-单线程公平锁为例
82 0
ReentrantLock源码分析笔记-单线程公平锁为例
|
机器学习/深度学习 安全 Java
java并发原理实战(10)--AQS 和公平锁分析
java并发原理实战(10)--AQS 和公平锁分析
131 0
java并发原理实战(10)--AQS 和公平锁分析
|
存储 安全 Java
java并发原理实战(5)--线程安全性问题和synchronized原理理解
java并发原理实战(5)--线程安全性问题和synchronized原理理解
java并发原理实战(5)--线程安全性问题和synchronized原理理解
|
安全 Java
java并发原理实战(9)--手动实现一个可重入锁
java并发原理实战(9)--手动实现一个可重入锁
122 0
java并发原理实战(9)--手动实现一个可重入锁
|
存储
ReentrantLock 原理解析(中)
ReentrantLock 原理解析
129 0
ReentrantLock 原理解析(中)