非公平锁实现原理
加锁解锁流程
先从构造器开始看,默认为非公平锁实现
1. public ReentrantLock() { 2. sync = new NonfairSync(); 3. }
NonfairSync 继承自 AQS
没有竞争时
第一个竞争出现时
Thread-1 执行了
1. CAS 尝试将 state 由 0 改为 1,结果失败
2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
3. 接下来进入 addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- 1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- 4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true 6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
1. // Sync 继承自 AQS 2. 3. static final class NonfairSync extends Sync { 4. private static final long serialVersionUID = 7316153563782823691L; 5. 6. // 加锁实现 7. final void lock() { 8. // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁 9. 10. if (compareAndSetState(0, 1)) 11. setExclusiveOwnerThread(Thread.currentThread()); 12. else 13. // 如果尝试失败,进入 ㈠ 14. acquire(1); 15. } 16. // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处 17. public final void acquire(int arg) { 18. // ㈡ tryAcquire 19. if (!tryAcquire(arg) && 20. // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤ 21. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 22. ) { 23. selfInterrupt(); 24. } 25. } 26. 27. // ㈡ 进入 ㈢ 28. protected final boolean tryAcquire(int acquires) { 29. return nonfairTryAcquire(acquires); 30. } 31. 32. // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处 33. final boolean nonfairTryAcquire(int acquires) { 34. final Thread current = Thread.currentThread(); 35. int c = getState(); 36. // 如果还没有获得锁 37. 38. if (c == 0) { 39. // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列 40. 41. if (compareAndSetState(0, acquires)) { 42. setExclusiveOwnerThread(current); 43. return true; 44. } 45. } 46. // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 47. 48. else if (current == getExclusiveOwnerThread()) { 49. // state++ 50. 51. int nextc = c + acquires; 52. if (nextc < 0) // overflow 53. 54. throw new Error("Maximum lock count exceeded"); 55. setState(nextc); 56. return true; 57. } 58. // 获取失败, 回到调用处 59. 60. return false; 61. } 62. 63. // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处 64. 65. private Node addWaiter(Node mode) { 66. // 将当前线程关联到一个 Node 对象上, 模式为独占模式 67. 68. Node node = new Node(Thread.currentThread(), mode); 69. // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部 70. 71. Node pred = tail; 72. if (pred != null) { 73. node.prev = pred; 74. if (compareAndSetTail(pred, node)) { 75. // 双向链表 76. 77. pred.next = node; 78. return node; 79. } 80. } 81. // 尝试将 Node 加入 AQS, 进入 ㈥ 82. 83. enq(node); 84. return node; 85. } 86. 87. // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处 88. 89. private Node enq(final Node node) { 90. for (; ; ) { 91. Node t = tail; 92. if (t == null) { 93. // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0) 94. 95. if (compareAndSetHead(new Node())) { 96. tail = head; 97. } 98. } else { 99. // cas 尝试将 Node 对象加入 AQS 队列尾部 100. 101. node.prev = t; 102. if (compareAndSetTail(t, node)) { 103. t.next = node; 104. return t; 105. } 106. } 107. } 108. } 109. 110. // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处 111. 112. final boolean acquireQueued(final Node node, int arg) { 113. boolean failed = true; 114. try { 115. boolean interrupted = false; 116. for (; ; ) { 117. final Node p = node.predecessor(); 118. // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取 119. 120. if (p == head && tryAcquire(arg)) { 121. // 获取成功, 设置自己(当前线程对应的 node)为 head 122. 123. setHead(node); 124. // 上一个节点 help GC 125. 126. p.next = null; 127. failed = false; 128. // 返回中断标记 false 129. 130. return interrupted; 131. } 132. if ( 133. // 判断是否应当 park, 进入 ㈦ 134. 135. shouldParkAfterFailedAcquire(p, node) && 136. 137. // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧ 138. 139. parkAndCheckInterrupt() 140. ) { 141. interrupted = true; 142. } 143. } 144. } finally { 145. if (failed) 146. cancelAcquire(node); 147. } 148. } 149. 150. // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处 151. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 152. // 获取上一个节点的状态 153. int ws = pred.waitStatus; 154. if (ws == Node.SIGNAL) { 155. // 上一个节点都在阻塞, 那么自己也阻塞好了 156. return true; 157. } 158. // > 0 表示取消状态 159. if (ws > 0) { 160. // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试 161. do { 162. node.prev = pred = pred.prev; 163. } while (pred.waitStatus > 0); 164. pred.next = node; 165. } else { 166. // 这次还没有阻塞 167. // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL 168. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 169. } 170. return false; 171. } 172. 173. // ㈧ 阻塞当前线程 174. private final boolean parkAndCheckInterrupt() { 175. LockSupport.park(this); 176. return Thread.interrupted(); 177. } 178. }
注意 是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定
解锁源码
1. // Sync 继承自 AQS 2. 3. static final class NonfairSync extends Sync { 4. // 解锁实现 5. public void unlock() { 6. sync.release(1); 7. } 8. 9. // AQS 继承过来的方法, 方便阅读, 放在此处 10. public final boolean release(int arg) { 11. // 尝试释放锁, 进入 ㈠ 12. if (tryRelease(arg)) { 13. // 队列头节点 unpark 14. Node h = head; 15. if ( 16. // 队列不为 null 17. h != null && 18. // waitStatus == Node.SIGNAL 才需要 unpark 19. h.waitStatus != 0 20. ) { 21. // unpark AQS 中等待的线程, 进入 ㈡ 22. 23. unparkSuccessor(h); 24. } 25. return true; 26. } 27. return false; 28. } 29. 30. // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处 31. 32. protected final boolean tryRelease(int releases) { 33. // state-- 34. int c = getState() - releases; 35. if (Thread.currentThread() != getExclusiveOwnerThread()) 36. throw new IllegalMonitorStateException(); 37. boolean free = false; 38. // 支持锁重入, 只有 state 减为 0, 才释放成功 39. 40. if (c == 0) { 41. free = true; 42. setExclusiveOwnerThread(null); 43. } 44. setState(c); 45. return free; 46. } 47. 48. // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处 49. 50. private void unparkSuccessor(Node node) { 51. // 如果状态为 Node.SIGNAL 尝试重置状态为 0 52. // 不成功也可以 53. int ws = node.waitStatus; 54. if (ws < 0) { 55. compareAndSetWaitStatus(node, ws, 0); 56. } 57. // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的 58. Node s = node.next; 59. // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点 60. if (s == null || s.waitStatus > 0) { 61. s = null; 62. for (Node t = tail; t != null && t != node; t = t.prev) 63. if (t.waitStatus <= 0) 64. s = t; 65. } 66. if (s != null) 67. LockSupport.unpark(s.thread); 68. } 69. }