在我的上一篇文章《面试官:谈一谈java中基于AQS的并发锁》中,讲到了ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等并发锁,以及Condition的使用和原理。
今天我们来聊一个JDK1.8中引入了的并发锁StampedLock,它跟其他的锁有什么优势呢?
初识StampedLock
让我们来看一下官方的示例:
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { //写锁 long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } double distanceFromOrigin() { //乐观读 long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; if (!sl.validate(stamp)) { stamp = sl.readLock(); try { currentX = x; currentY = y; } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { //锁升级 //下面的代码也可以在开始时使用乐观读,之后升级 long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = sl.tryConvertToWriteLock(stamp); if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { sl.unlock(stamp); } } }
从这个示例中我们能看出
1.除了悲观读写锁之外,StampedeLock支持乐观读。乐观读并没有真正获取到锁,因此不需要释放。
2.无论是乐观读还是悲观读写锁,都会返回一个stamp,释放的时候需要带这个stamp。
3.即使有线程在乐观读,当前线程还是可以获取到写锁的。只是乐观读的线程validate方法返回失败,这是就需要锁升级。
4.相比于ReentrantReadWriteLock这个读写锁,StampedLock的优势是可以支持乐观读,这样节省了首次获取锁的开销。
5.stamp可以进行锁升级和降级。
返回stamp的作用
下面的代码,线程1先进行乐观读,之后线程2获取到写锁,这是线程1验证stamp失败。
下面的代码,线程1先进行乐观读,之后线程2获取到写锁,这是线程1验证stamp失败。 public static void main(String[] args) { StampedLock sl = new StampedLock(); LocalThreadPool.run(() -> { try { long stamp = sl.tryOptimisticRead(); System.out.println(Thread.currentThread().getName() + "stamp:" + stamp); Thread.currentThread().sleep(2000);//睡眠2s,等待线程2获取到写锁 System.out.println(sl.validate(stamp));//线程2获取到写锁后验证失败 } catch (InterruptedException e) { e.printStackTrace(); } }); try { Thread.currentThread().sleep(1000);//主线程睡眠1s,目的是让线程2晚于线程1启动 } catch (InterruptedException e) { e.printStackTrace(); } LocalThreadPool.run(() -> { long stamp = sl.writeLock(); System.out.println(Thread.currentThread().getName() + "stamp:" + stamp); try { Thread.currentThread().sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { sl.unlockWrite(stamp); } }); }
上面的代码输出结果如下:
pool-1-thread-1stamp:256 pool-1-thread-2stamp:384 false
从上面的代码可以看出,stamp可以保证乐观读的过程中没有写锁加入,还有一个作用就是释放锁的时候需要带stamp。
public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;//如果存在写锁,返回0,否则返回 s & SBITS }
在创建StampedLock后,如果没有更改过上面代码中state的值,这个方法永远返回256,而且会验证成功
public boolean validate(long stamp) { U.loadFence(); return (stamp & SBITS) == (state & SBITS);//本质上就是验证stamp == state,而stamp返回256,state初始化值是256 } public StampedLock() { state = ORIGIN;//state的值初始化为256 }
为了好理解源码,我贴出StampedLock类中定义的一些常量,如下:
private static final long RUNIT = 1; private static final long WBIT = 128; private static final long RBITS = 127; private static final long RFULL = 126; private static final long ABITS = 255; private static final long SBITS = -128; private static final long ORIGIN = 256;//state的初始化值 private static final long INTERRUPTED = 1L; private static final int WAITING = -1; private static final int CANCELLED = 1; private static final int RMODE = 0; private static final int WMODE = 1; private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1; //多核CPU=64,单核CPU=1,我本地环境是4核,所以=64 private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1;//多核CPU=1024,单核CPU=1,我本地环境是4核,所以=1024
获取和释放写锁
获取写锁的过程跟ReentrantLock中获取独占锁的流程基本一致,先尝试获取锁,失败则进入队列。整个流程入下:
我们来看下这一段源代码
public long writeLock() { long next; return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L);//返回0,则进入队列 } public long tryWriteLock() { long s; //这个地方在初始化的情况下,state=256,所以第一次获取写锁时 (100000000 & 011111111) = 0, 但是如果第一个写锁还没有释放,第二次加写锁时 (384 & 255) = (110000000 & 011111111) = 128,返回128后直接进入队列 //从第3次开始获取写锁时 (128 & 255) = (010000000 & 011111111) = 128,之后这个值一直都是128,直到队列中的写锁释放 return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L; } private long tryWriteLock(long s) { // assert (s & ABITS) == 0L; long next; if (casState(s, next = s | WBIT)) { // 初始化第一次获取写锁会走到这儿 (100000000 | 010000000 ) = 384 (110000000) VarHandle.storeStoreFence(); return next;//初始情况下,state值没有被修改过,返回384 } return 0L; }
下面方法进入队列,如果只有一个写锁,是一个FIFO的队列,第一个head节点是一个写模式的虚拟节点,不获取锁,后面获取写锁的线程依次进入队列,如下图:
这部分源代码如下
下面方法进入队列,如果只有一个写锁,是一个FIFO的队列,第一个head节点是一个写模式的虚拟节点,不获取锁,后面获取写锁的线程依次进入队列,如下图: private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { //这个循环就是为了让当前节点进入等待队列 long m, s, ns; if ((m = (s = state) & ABITS) == 0L) {//这儿等于0说明state已经是256了(锁等待队列中已经没有元素了),当前元素无需入队,直接获取锁试试 if ((ns = tryWriteLock(s)) != 0L) return ns; } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; // spins = 64 else if (spins > 0) { --spins; Thread.onSpinWait(); } else if ((p = wtail) == null) { //初始化队列 WNode hd = new WNode(WMODE, null);//队列的头结点是一个写模式的节点 if (WHEAD.weakCompareAndSet(this, null, hd)) wtail = hd; } else if (node == null) node = new WNode(WMODE, p); else if (node.prev != p)//设置当前元素的前置节点是队尾元素 node.prev = p; else if (WTAIL.weakCompareAndSet(this, p, node)) {//放到队尾 p.next = node; break; } } boolean wasInterrupted = false; for (int spins = -1;;) {//这个循环目的就是不断的自旋,直到被唤醒 WNode h, np, pp; int ps; if ((h = whead) == p) {//head == tail,说明队列只有一个元素,尝试直接获取锁 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins; k > 0; --k) { // spin at head long s, ns; if (((s = state) & ABITS) == 0L) {//此时state=256,尝试获取写锁 if ((ns = tryWriteLock(s)) != 0L) {//获取写锁成功 whead = node; node.prev = null; if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } } else Thread.onSpinWait(); } } else if (h != null) { // help release stale waiters WNode c; Thread w; while ((c = h.cowait) != null) {//唤醒等待队列上的读锁队列,见下面的数据结构图 if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) && (w = c.thread) != null) LockSupport.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) {//当前节点的前置节点改为队尾节点 if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0)//修改状态为等待 WSTATUS.compareAndSet(p, 0, WAITING); else if (ps == CANCELLED) {//如果队尾元素取消,则将当前节点的前置节点设置为队尾的前一个节点 if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L)//这个获取锁传入0 time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) {//当前线程进入blocking状态等待唤醒 if (time == 0L) LockSupport.park(this); else LockSupport.parkNanos(this, time); } node.thread = null; if (Thread.interrupted()) { if (interruptible) return cancelWaiter(node, node, true);//取消获取锁 wasInterrupted = true; } } } } }
从上面的代码看出,获取写锁的逻辑还是挺复杂的,其中有2个自旋动作,相比ReentrantLock有一定的性能损耗。释放锁的流程比较简单,代码如下:
从上面的代码看出,获取写锁的逻辑还是挺复杂的,其中有2个自旋动作,相比ReentrantLock有一定的性能损耗。释放锁的流程比较简单,代码如下: public void unlockWrite(long stamp) { if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); unlockWriteInternal(stamp); } private long unlockWriteInternal(long s) { long next; WNode h; STATE.setVolatile(this, next = unlockWriteState(s));//state+128 if ((h = whead) != null && h.status != 0) release(h); return next; } private static long unlockWriteState(long s) { return ((s += WBIT) == 0L) ? ORIGIN : s; } private void release(WNode h) { if (h != null) { WNode q; Thread w; WSTATUS.compareAndSet(h, WAITING, 0);//修改等待状态为初始状态 if ((q = h.next) == null || q.status == CANCELLED) {//下一个节点为空或者取消状态,从后往前找到一个非空节点来唤醒 for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t;//这儿并没有break,所以找到的是离头结点最近的节点,为什么不从头到尾遍历呢? } if (q != null && (w = q.thread) != null) LockSupport.unpark(w);//唤醒后面等待节点 } }
获取和释放读锁
获取读锁的流程非常复杂,主要还是因为数据结构的原因。我们知道java中HashMap的底层数据结构是数组+链表来解决的,链表主要是为了解决hash冲突。而StampedLock数据结构有点类似,它采用的是一个FIFO的队列接一个LIFO的队列,而LIFO存放的是读锁的队列。
现在假如我们有1个线程来获取写锁,4个线程来获取读锁,这时又有1个线程来获取写锁,但是锁已经被占用需要进入队列,这时队列结构如下
public long readLock() { long s, next; return (whead == wtail && ((s = state) & ABITS) < RFULL && casState(s, next = s + RUNIT)) ? next : acquireRead(false, 0L);//入队 } private long acquireRead(boolean interruptible, long deadline) { boolean wasInterrupted = false; WNode node = null, p; for (int spins = -1;;) {//这个循环一直尝试直到入队成功 WNode h; if ((h = whead) == (p = wtail)) {//队列为空或头结点等于尾结点,死循环中不断尝试尝试获取读锁 for (long m, s, ns;;) { if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } else if (m >= WBIT) { if (spins > 0) { --spins; Thread.onSpinWait(); } else { if (spins == 0) { WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np))//一个周期结束还没有获取到锁,break后入队 break; } spins = SPINS; } } } } if (p == null) { //初始化队列,头结点mode是WMODE(写模式) WNode hd = new WNode(WMODE, null); if (WHEAD.weakCompareAndSet(this, null, hd)) wtail = hd; } else if (node == null) node = new WNode(RMODE, p); else if (h == p || p.mode != RMODE) {//尾结点是写模式,加入队尾 if (node.prev != p) node.prev = p; else if (WTAIL.weakCompareAndSet(this, p, node)) {//入队成功后跳出循环 p.next = node; break;//注意:整个方法由2个大的for循环,这个break跳出第一个大循环 } } else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node))//放入WCOWAIT的前面,所以是LIFO node.cowait = null; else { for (;;) { WNode pp, c; Thread w; if ((h = whead) != null && (c = h.cowait) != null && WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null) // help release LockSupport.unpark(w);//头结点WCOWAIT不为空,唤醒WCOWAIT读模式等待线程 if (Thread.interrupted()) { if (interruptible) return cancelWaiter(node, p, true); wasInterrupted = true; } if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do {//头结点是尾结点前置或者头尾节点相等或者尾结点前置节点是空,这时在循环中获取锁 if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } } while (m < WBIT);//一直到当前队列中没有写锁 } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) {//跳出当前循环继续最外层循环 node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) {//超时,取消 if (wasInterrupted) Thread.currentThread().interrupt(); return cancelWaiter(node, p, false); } Thread wt = Thread.currentThread(); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {//BLOCKED等待被唤醒 if (time == 0L) LockSupport.park(this); else LockSupport.parkNanos(this, time); } node.thread = null; } } } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) {//头结点等于尾结点 if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) {//唤醒当前读队列中的元素 if (WCOWAIT.compareAndSet(node, c, c.cowait) && (w = c.thread) != null) LockSupport.unpark(w); } if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } else if (m >= WBIT && --k <= 0) break; else Thread.onSpinWait(); } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) {//头结点不等于尾结点并且头结点不为空,唤醒读队列中元素 if (WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null) LockSupport.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node;//当前节点指向队尾 } else if ((ps = p.status) == 0) WSTATUS.compareAndSet(p, 0, WAITING); else if (ps == CANCELLED) {//尾结点取消,当前节点放入尾结点前置节点 if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L)//超时取消 return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {//前面是一个写锁,当前元素时读队列第一个了,入队后阻塞等待唤醒 if (time == 0L) LockSupport.park(this); else LockSupport.parkNanos(this, time); } node.thread = null; if (Thread.interrupted()) { if (interruptible) return cancelWaiter(node, node, true); wasInterrupted = true; } } } } }
从上面可以看到,获取读锁的代码非常复杂,有2个原因,一个是因为复杂的数据结构,另一个是因为有多次自旋来等待获取锁,所以这一块儿效率也会比较低。
下面我们看一下释放读锁锁的流程,释放读锁的流程比较简单,如下:
public void unlockRead(long stamp) { long s, m; WNode h; while (((s = state) & SBITS) == (stamp & SBITS) && (stamp & RBITS) > 0L && ((m = s & RBITS) > 0L)) {//循环不断的释放读锁 if (m < RFULL) { if (casState(s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h);//LIFO队列中读锁释放完后唤醒FIFO队列的下一个 return; } } else if (tryDecReaderOverflow(s) != 0L)//读锁队列已满,减掉一个 return; } throw new IllegalMonitorStateException(); }
锁的升级和降级
StampedLock支持锁升级和降级,主要有转化成乐观读,转化悲观读锁,转化成悲观写锁。有了前面的代码积累,理解这段代码也不是太难了。具体代码如下:
public long tryConvertToOptimisticRead(long stamp) {//转乐观读 long a, m, s, next; WNode h; VarHandle.acquireFence(); while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) {//当前线程持有写锁 if (s != stamp)//参数错误,退出循环 break; return unlockWriteInternal(s);//释放写锁 } else if (a == 0L)//当前线程只有乐观读,无须释放锁,直接返回 return stamp; else if ((m = s & ABITS) == 0L) //stamp无效,退出循环 break; else if (m < RFULL) {//当前线程持有读锁 if (casState(s, next = s - RUNIT)) {//释放读锁 if (m == RUNIT && (h = whead) != null && h.status != 0) release(h);//唤醒下一个节点 return next & SBITS; } } else if ((next = tryDecReaderOverflow(s)) != 0L) return next & SBITS; } return 0L;//退出循环后返回0,会导致validate失败 } public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) {//当前线程只存在乐观读 if (a != 0L) break; if ((next = tryWriteLock(s)) != 0L)//尝试获取写锁 return next; } else if (m == WBIT) {//当前线程持有写锁直接返回stamp if (a != m)//参数错误 break; return stamp; } else if (m == RUNIT && a != 0L) {//当前线程持有悲观读锁,直接转化state值,转为悲观写锁的state值 if (casState(s, next = s - RUNIT + WBIT)) { VarHandle.storeStoreFence(); return next; } } else break; } return 0L; } public long tryConvertToReadLock(long stamp) { long a, s, next; WNode h; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) {//当前线程持有写锁 if (s != stamp) break; STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT);//尝试直接释放写锁 if ((h = whead) != null && h.status != 0) release(h);//唤醒下一个节点 return next; } else if (a == 0L) {//当前线程只有乐观读,直接获取读锁 if ((s & ABITS) < RFULL) { if (casState(s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } else {//当前线程持有读锁,直接返回stamp if ((s & ABITS) == 0L) break; return stamp; } } return 0L; }
总结
StampedLock并不是基于AQS来实现的,乐观读能够让当前线程少一次直接尝试获取悲观读锁的时间开销,锁的升级和降级也能让获取锁更加灵活。
StampedLock是非公平锁,因为每次获取都会尝试自旋直接获取。
StampedLock是不可重入的,从上面的代码可以看出,如果当前线程2次获取写锁,第二次写锁会加入队列导致当前线程阻塞,这样第一次的写锁释放代码一直走不到,因为不能被唤醒。
由于自旋的原因,性能上会有一定的损耗,尤其是当前面的线程执行时间很长时,只能等到自旋获取失败,加入等待队列。
StampedLock的数据结构比较特别,整个等待队列是FIFO队列,但是读锁队列是LIFO队列。