这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束
写锁上锁流程
1. static final class NonfairSync extends Sync { 2. // ... 省略无关代码 3. // 外部类 WriteLock 方法, 方便阅读, 放在此处 4. public void lock() { 5. sync.acquire(1); 6. } 7. 8. // AQS 继承过来的方法, 方便阅读, 放在此处 9. public final void acquire(int arg) { 10. if ( 11. // 尝试获得写锁失败 12. !tryAcquire(arg) && 13. // 将当前线程关联到一个 Node 对象上, 模式为独占模式 14. // 进入 AQS 队列阻塞 15. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 16. ) { 17. selfInterrupt(); 18. } 19. } 20. 21. // Sync 继承过来的方法, 方便阅读, 放在此处 22. protected final boolean tryAcquire(int acquires) { 23. // 获得低 16 位, 代表写锁的 state 计数 24. Thread current = Thread.currentThread(); 25. int c = getState(); 26. int w = exclusiveCount(c); 27. if (c != 0) { 28. if ( 29. // c != 0 and w == 0 表示有读锁, 或者 30. w == 0 || 31. // 如果 exclusiveOwnerThread 不是自己 32. current != getExclusiveOwnerThread() 33. ) { 34. // 获得锁失败 35. return false; 36. } 37. // 写锁计数超过低 16 位, 报异常 38. if (w + exclusiveCount(acquires) > MAX_COUNT) 39. throw new Error("Maximum lock count exceeded"); 40. // 写锁重入, 获得锁成功 41. setState(c + acquires); 42. return true; 43. } 44. if ( 45. // 判断写锁是否该阻塞, 或者 46. writerShouldBlock() || 47. // 尝试更改计数失败 48. !compareAndSetState(c, c + acquires) 49. ) { 50. // 获得锁失败 51. return false; 52. } 53. // 获得锁成功 54. setExclusiveOwnerThread(current); 55. return true; 56. } 57. 58. // 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞 59. final boolean writerShouldBlock() { 60. return false; 61. } 62. }
写锁释放流程
1. static final class NonfairSync extends Sync { 2. // ... 省略无关代码 3. // WriteLock 方法, 方便阅读, 放在此处 4. public void unlock() { 5. sync.release(1); 6. } 7. // AQS 继承过来的方法, 方便阅读, 放在此处 8. public final boolean release(int arg) { 9. // 尝试释放写锁成功 10. if (tryRelease(arg)) { 11. // unpark AQS 中等待的线程 12. Node h = head; 13. if (h != null && h.waitStatus != 0) 14. unparkSuccessor(h); 15. return true; 16. } 17. return false; 18. } 19. // Sync 继承过来的方法, 方便阅读, 放在此处 20. protected final boolean tryRelease(int releases) { 21. if (!isHeldExclusively()) 22. throw new IllegalMonitorStateException(); 23. int nextc = getState() - releases; 24. // 因为可重入的原因, 写锁计数为 0, 才算释放成功 25. boolean free = exclusiveCount(nextc) == 0; 26. if (free) { 27. setExclusiveOwnerThread(null); 28. } 29. setState(nextc); 30. return free; 31. } 32. }
读锁上锁流程
1. static final class NonfairSync extends Sync { 2. // ReadLock 方法, 方便阅读, 放在此处 3. public void lock() { 4. sync.acquireShared(1); 5. } 6. 7. // AQS 继承过来的方法, 方便阅读, 放在此处 8. public final void acquireShared(int arg) { 9. // tryAcquireShared 返回负数, 表示获取读锁失败 10. if (tryAcquireShared(arg) < 0) { 11. doAcquireShared(arg); 12. } 13. } 14. 15. // Sync 继承过来的方法, 方便阅读, 放在此处 16. protected final int tryAcquireShared(int unused) { 17. Thread current = Thread.currentThread(); 18. int c = getState(); 19. // 如果是其它线程持有写锁, 获取读锁失败 20. if ( 21. exclusiveCount(c) != 0 && 22. getExclusiveOwnerThread() != current 23. ) { 24. return -1; 25. } 26. int r = sharedCount(c); 27. if ( 28. // 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且 29. !readerShouldBlock() && 30. // 小于读锁计数, 并且 31. r < MAX_COUNT && 32. // 尝试增加计数成功 33. compareAndSetState(c, c + SHARED_UNIT) 34. ) { 35. // ... 省略不重要的代码 36. return 1; 37. } 38. return fullTryAcquireShared(current); 39. } 40. 41. // 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁 42. // true 则该阻塞, false 则不阻塞 43. final boolean readerShouldBlock() { 44. return apparentlyFirstQueuedIsExclusive(); 45. } 46. 47. // AQS 继承过来的方法, 方便阅读, 放在此处 48. // 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞 49. final int fullTryAcquireShared(Thread current) { 50. HoldCounter rh = null; 51. for (; ; ) { 52. int c = getState(); 53. if (exclusiveCount(c) != 0) { 54. if (getExclusiveOwnerThread() != current) 55. return -1; 56. } else if (readerShouldBlock()) { 57. // ... 省略不重要的代码 58. } 59. if (sharedCount(c) == MAX_COUNT) 60. throw new Error("Maximum lock count exceeded"); 61. if (compareAndSetState(c, c + SHARED_UNIT)) { 62. // ... 省略不重要的代码 63. return 1; 64. } 65. } 66. } 67. 68. // AQS 继承过来的方法, 方便阅读, 放在此处 69. private void doAcquireShared(int arg) { 70. // 将当前线程关联到一个 Node 对象上, 模式为共享模式 71. final Node node = addWaiter(Node.SHARED); 72. boolean failed = true; 73. try { 74. boolean interrupted = false; 75. for (; ; ) { 76. final Node p = node.predecessor(); 77. if (p == head) { 78. // 再一次尝试获取读锁 79. int r = tryAcquireShared(arg); 80. // 成功 81. if (r >= 0) { 82. // (一) 83. // r 表示可用资源数, 在这里总是 1 允许传播 84. //(唤醒 AQS 中下一个 Share 节点) 85. setHeadAndPropagate(node, r); 86. p.next = null; // help GC 87. if (interrupted) 88. selfInterrupt(); 89. failed = false; 90. return; 91. } 92. } 93. if ( 94. // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL) 95. shouldParkAfterFailedAcquire(p, node) && 96. // park 当前线程 97. parkAndCheckInterrupt() 98. ) { 99. interrupted = true; 100. } 101. } 102. } finally { 103. if (failed) 104. cancelAcquire(node); 105. } 106. } 107. 108. // (一) AQS 继承过来的方法, 方便阅读, 放在此处 109. private void setHeadAndPropagate(Node node, int propagate) { 110. Node h = head; // Record old head for check below 111. // 设置自己为 head 112. setHead(node); 113. // propagate 表示有共享资源(例如共享读锁或信号量) 114. // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE 115. // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE 116. if (propagate > 0 || h == null || h.waitStatus < 0 || 117. (h = head) == null || h.waitStatus < 0) { 118. Node s = node.next; 119. // 如果是最后一个节点或者是等待共享读锁的节点 120. if (s == null || s.isShared()) { 121. // 进入 (二) 122. doReleaseShared(); 123. } 124. } 125. } 126. // (二) AQS 继承过来的方法, 方便阅读, 放在此处 127. private void doReleaseShared() { 128. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark 129. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析 130. for (;;) { 131. Node h = head; 132. // 队列还有节点 133. if (h != null && h != tail) { 134. int ws = h.waitStatus; 135. if (ws == Node.SIGNAL) { 136. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 137. continue; // loop to recheck cases 138. // 下一个节点 unpark 如果成功获取读锁 139. // 并且下下个节点还是 shared, 继续 doReleaseShared 140. unparkSuccessor(h); 141. } 142. else if (ws == 0 && 143. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 144. continue; // loop on failed CAS 145. } 146. if (h == head) // loop if head changed 147. break; 148. } 149. } 150. }
读锁释放流程
1. static final class NonfairSync extends Sync { 2. // ReadLock 方法, 方便阅读, 放在此处 3. public void unlock() { 4. sync.releaseShared(1); 5. } 6. // AQS 继承过来的方法, 方便阅读, 放在此处 7. public final boolean releaseShared(int arg) { 8. if (tryReleaseShared(arg)) { 9. doReleaseShared(); 10. return true; 11. } 12. return false; 13. } 14. // Sync 继承过来的方法, 方便阅读, 放在此处 15. protected final boolean tryReleaseShared(int unused) { 16. // ... 省略不重要的代码 17. for (;;) { 18. int c = getState(); 19. int nextc = c - SHARED_UNIT; 20. if (compareAndSetState(c, nextc)) { 21. // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程 22. // 计数为 0 才是真正释放 23. return nextc == 0; 24. } 25. } 26. } 27. // AQS 继承过来的方法, 方便阅读, 放在此处 28. private void doReleaseShared() { 29. // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark 30. // 如果 head.waitStatus == 0 ==> Node.PROPAGATE 31. for (;;) { 32. Node h = head; 33. if (h != null && h != tail) { 34. int ws = h.waitStatus; 35. // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0 36. // 防止 unparkSuccessor 被多次执行 37. if (ws == Node.SIGNAL) { 38. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 39. continue; // loop to recheck cases 40. unparkSuccessor(h); 41. } 42. // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析 43. else if (ws == 0 && 44. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 45. continue; // loop on failed CAS 46. } 47. if (h == head) // loop if head changed 48. break; 49. } 50. } 51. }