并发编程是Java中非常关键的基本功,并发编程包括的知识点如下图,非常之多,锁是保证并发的一种手段之一,本文将围绕Aqs,ReentrantLock,ReentrantReadWriteLock的实现来分析锁设计的思想。
Jdk为什么需要设计Lock?
使用了Sychronized之后,发现Sychronized提供的锁功能存在一些短板,比如不能取消,不可以等待,不可以中断,而Lock就弥补了Sychronized的这些缺陷,下面是区别,所以Lock有他存在的必要性。
Lock与Sychronized区别、特性
功能方面 1、Synchronized Jvm关键字实现 互斥锁实现。monitorenter和monitorexit 2、Synchronized 使用范围包括操作方法,对象,同步代码块 3、Synchornized 可重入 不可中断 不可等待取消 非公平,而Lock是可重入,可中断,可等待取消,支持公平/非公平(默认) 4、Synchronized在线程发生异常时会自动释放锁,因此不会发生异常死锁。Lock异常时不会自动释放锁,所以需要在finally中实现释放锁。 5、Lock是可以中断锁,Synchronized是非中断锁,必须等待线程执行完成释放锁。 6、Lock可以使用读锁提高多线程读效率,如ReentrantLock。 7、在Lock中可以使用多组Condition实现多组线程多条件通信,而Sychronized使用wait/notify只能针对一组线程通信。
在实现方面 1、都使用了cas+volatile+自旋。 2、排队抢锁的线程,sychronized使用了两个队列,而reentrantlock只使用了一个队列。 3、sychronized有自适应自旋,而Lock没有,Lock只有自旋和park.
在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?
排队+wait状态 通过前一个线程唤醒后一个线程
非公平锁的获取锁设计思想
1、一请求获取锁,就尝试获取锁 2、获取锁失败,继续尝试一定次数来获取锁 3、获取锁成功,通过cas实现state原子字段自增,exclusiveOwnerThread设置成当前变量 4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。 5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程
非公平锁的释放锁设计思想:
1、unlock方法调用一次将state减少1, 2、如果state变成了0,则exclusiveOwnerThread设置成null 3、释放锁成功后,需要将等待获取锁的线程唤醒。
Condition设计思想:
类似于Object的wait方法,notify方法,实现线程直接的通信。 在生产者消费者模型的例子里: 生产者线程发现消息队列满了,则调用await方法阻塞, 生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费 消费者线程发现消息队列空了,则调用await方法阻塞, 消费者线程消费消息成功,则调用signal方法通知生产者进行生产
Condition的await方法设计思想
当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源
/**
* Implements interruptible condition wait.
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁资源,让其他线程有机会获得锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//阻塞,如果被唤醒,会继续执行
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后再次获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Conditon的signal方法设计思想
需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列
/**
* 获取一个等待条件最久的线程,
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获得第一个等待被唤醒的线程节点
Node first = firstWaiter;
if (first != null)
//执行唤醒操作
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 移动节点从条件队列到同步队列
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将需要被唤醒的节点加入同步队列
Node p = enq(node);
int ws = p.waitStatus;
//实现唤醒操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
aqs(AbstractQueuedSynchronizer)基础组件提供的能力
1、内部维护了Node类型的双向链表,用来存储等待线程的,以及等待线程的状态,提供线程排队基础。 Node 有 5 中状态,分别是:
- CANCELLED(1): 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点,其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
- SIGNAL(-1): 只要前置节点释放锁,就会通知标识为SIGNAL状态的后续节点的线程抢占锁
- CONDITION(-2): 和 Condition 有关系
- PROPAGATE(-3):共享模式下,PROPAGATE状态的线程处于可运行状态
- 5、0:初始状态
2、一个volatile int state字段,用来表示锁的状态和锁的重入次数 0无锁,>0加锁,提供了get/set/compareAndSwapInt方法来操作state状态。 3、等待线程入队。 4、唤醒后继节点。 5、加锁算法定义。 6、释放独占锁,共享锁算法定义,提供子类重写方法,实现不同的同步组件 7、提供了一些监控锁的方法,等待线程队列长度,排队中的线程, 8、condition辅助类
aqs是对锁的通用功能进行抽象,子类通过继承aqs实现同步的功能。 其中一个实现是ReentrantLock。
ReentrantLock
他是基于aqs实现的一种独占锁,非共享锁。 类结构图如下:
支持公平锁和非公平锁。默认是非公平锁
非公平锁设计思想
1、一请求就尝试获取锁 2、获取锁失败,继续尝试一定次数来获取锁 3、获取锁成功,通过cas实现state字段值原子自增,exclusiveOwnerThread设置成当前线程。 4、如果已经获取到了锁,state自增1,实现可重入特性,当前线程对应的节点设置成头节点。 5、如果尝试获取锁失败,将线程加入队列,并阻塞当前线程 如下是加锁成功的代码。
非公平锁获取锁分四个阶段,详细流程如下
第一阶段,快速占用锁阶段
- 这个阶段就是一调用lock方法,就直接通过cas设置一次state字段加锁,不管有前面没有线程获取锁,这就是体现了
非公平
的地方。
抢锁入口:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
加锁
*/
final void lock() {
//通过cas快速加锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//进入正常加锁流程
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
第二阶段,正常的加锁阶段
- 这个阶段和第一阶段类似,也是使用cas尝试加一次锁,多了重入锁的判断,如果当前锁已经被当前线程占有了,state自增,体现了
重入锁
特性。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//尝试快速加一次锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//重入锁判断和获取
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
第三阶段,当前线程加入队列
这个阶段,是线程入队列,之前尝试两次都没抢到锁,需要排队抢锁
//线程加入等待队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//尝试快速加入队列尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//正常入队列
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize 初始化队头(空节点)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//加入队尾 首先新节点 前置节点执行老尾节点
node.prev = t;
//新的尾节点 设置成 新节点
if (compareAndSetTail(t, node)) {
//老尾节点 指向 新节点
t.next = node;
return t;
}
}
}
}
第四阶段,自旋抢站锁阶段
自旋,根据上一个节点状态来判断是否可以抢占锁,还是park阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
//如果前驱节点是头节点,则具备抢锁资格
if (p == head && tryAcquire(arg)) {
//抢锁成功了,自己作为头节点
setHead(node);
//帮助老的头节点回收
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否需要park阻塞,前一个节点状态是SIGNAL就要park,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果达到了park条件,则可能进入park,
private final boolean parkAndCheckInterrupt() {
//调用unsafe类的park方法 阻塞在这里,等待获取锁的线程释放锁并唤醒 我 然后继续执行
LockSupport.park(this);
return Thread.interrupted();
}
公平/非公平锁释放锁流程
非公平锁的释放锁设计思想:
1、unlock方法调用一次将state减少1, 2、如果state变成了0,则exclusiveOwnerThread设置成null 3、释放锁成功后,需要将等待获取锁的线程唤醒。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果头节点状态不是0,表示需要唤醒头节点的下一个节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//唤醒下一个节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//下一个节点是空的,`从尾节点往前找`,最接近head的状态小于0的线程节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//指向unpark
if (s != null)
LockSupport.unpark(s.thread);
}
上面为啥head.next是空的,还有从尾巴节点遍历找需要唤醒的节点 由于入队操作,先构建尾节点,再将老尾节点指向新节点,next链比prev链延迟构建
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//和尾巴节点
node.prev = t;
if (compareAndSetTail(t, node)) {
//最后一步才和尾节点关联
t.next = node;
return t;
}
}
}
}
以上分析的是非公平锁,他们释放锁流程一致,
公平锁和非公平锁加锁流程有什么不同?
final void lock() {
acquire(1);
}
少了非公平锁的快速抢占锁阶段,在正常获取锁之前,加入hasQueuedPredecessors判断,是否有前驱节点,其它步骤一样, 经过排队抢锁。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//查询是否有任何线程在等待获取比当前线程更长的时间
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
Condition的设计思想
Condition设计思想:
类似于Object的wait方法,notify方法,实现线程之间的通信。 在生产者消费者模型的例子里: 生产者线程发现消息队列满了,则调用await方法阻塞, 生产者线程如果生产消息成功,则调用signal方法通知消费者线程进行消费 消费者线程发现消息队列空了,则调用await方法阻塞, 消费者线程消费消息成功,则调用signal方法通知生产者进行生产
//ConditionObject 设计了两个节点 头节点 尾节点 维护了一个单向链表,维护等待条件的线程链路。
public class ConditionObject implements Condition {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
Condition的await方法设计思想
当前线程需要通过LockSupport.park方法阻塞,释放cpu资源,并且通过调用release方法释放锁,让其他线程可以获取到锁,并且把当前线程存起来,让其他线程唤醒,在阻塞的方法后面,如果被唤醒,需要重新获取锁资源
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁资源,让其他线程有机会获得锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后再次获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Conditon的signal方法设计思想 需要将第一个等待需要唤醒的线程进行唤醒,并且加入同步队列
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获得第一个等待被唤醒的线程节点
Node first = firstWaiter;
if (first != null)
//执行唤醒操作
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//将需要被唤醒的节点加入同步队列
Node p = enq(node);
int ws = p.waitStatus;
//实现唤醒操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
针对中断获取锁 直接抛出中断一次
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//中断状态抛异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//中断状态抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
取消获取锁的线程
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 设置为取消状态
node.waitStatus = Node.CANCELLED;
// 节点在尾处 直接去除 置null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 节点去除
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//唤醒下一个节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Lock和Sychronized在非公平锁的加锁解锁的实现上差异?
- 快速加锁阶段,lock通过cas,设置state变量,再设置独占线程 而sychronized是通过cas设置对象头上线程id.(偏向锁阶段)
- lock通过 cas一次设置state变量 而sychronized设置锁记录指针到对象头(轻量级别锁)
- lock 先排队,通过自旋 + cas + park 设置state字段,再设置线程对象。而sychronized通过 自旋 + cas + 排队 + park 设置线程id(重量级锁),lock中只有一个队列用户抢锁排队,而sychronized设计了两个队列,2q算法,减少对一个队列的并发处理。
- wait/notify和signal/await的设计思想类型,都使用了一个队列存储等待的线程。结合加解锁api来实现线程通信。
ReentrantReadWriteLock可重入读写锁的实现解析
基于ReentrantLock主要改造点
- 读写锁的标志,通过32位整形变量,高16位存读锁标志,低16位存写锁标志。
- ReentrantReadWriteLock有读和写两把锁,写锁和ReentrantLock实现方式类似,因为他们都是独占锁,而读锁,ReentrantReadWriteLock使用了ThreadLocal对不同线程获取读锁的次数进行隔离,加锁操作,是否锁操作互不影响。
写锁获取源码,和ReentrantLock流程类似
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//锁状态不为0,已经加了读锁或写锁
if (c != 0) {
//写锁为0或者线程不是当前线程,其它线程获取了读锁,则本次获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//锁次数是否会超过上限
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//已经获取了写锁,重入次数自增
// Reentrant acquire
setState(c + acquires);
return true;
}
//写锁是否要阻塞,如果是非公平不需要阻塞,公平要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
//与运算
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 0000 0000 0000 0001 左移16位,再减1
//0001 0000 0000 0000 0000
//0000 1111 1111 1111 1111 ==写锁的标记
//
//计算写锁的数量
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
写锁释放
和Reentrantlock的释放逻辑一致 修改状态,唤醒下一个线程
读锁获取
//最后一个线程 获取读锁 的计数器
private transient HoldCounter cachedHoldCounter;
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//写锁被其它线程占有了,失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//是否要阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//第一个读锁线程,专有字段进行记录
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//第一个获取读锁线程,重入
firstReaderHoldCount++;
} else {
//不是第一个获取读锁线程,不是当前线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
读锁释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//对获取第1个的线程进行判断,释放锁,缓存第一个获取锁的线程和计数,加快释放锁的速度
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//最后一个获取锁的计数器,缓存,也是为了最后一个获取锁的线程进行快速释放锁
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//释放锁,最后需要修改state状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}