前言
之前已经说过了ReentrantLock ReentrantReadWriteLock,可以参考之前的博客。在ReentrantReadWriteLock源码解析文末,我提到了ReentrantReadWriteLock的缺点,就是无法避免写线程饥渴的问题,而今天要说的StampedLock提供了乐观读的API,解决了写饥渴的问题。
插播一些内容,我有个同事为了解决写饥饿的问题使用了StampedLock,而并没有用tryOptimisticRead(),单纯以为StampedLock解决了写饥饿的问题,但实际上不用tryOptimisticRead和直接使用ReentrantReadWriteLock没啥区别,都是悲观锁还是会有写饥渴的问题,而且个人感觉代码还会更复杂一些。所以我觉得在说StampedLock的具体实现之前,有必要先来看下StampedLock的正确使用方式。
public class StampedLockDemo { private StampedLock stampedLock = new StampedLock(); private int data = 0; public void writeData() { long stamp = stampedLock.writeLock(); try { data += 1; } finally { stampedLock.unlockWrite(stamp); } } public int readData() { long stamp = stampedLock.tryOptimisticRead(); // 1 int curData = this.data; if (!stampedLock.validate(stamp)) { // 2 try { stamp = stampedLock.readLock(); // 3 curData = this.data;; } finally { stampedLock.unlockRead(stamp); } } return curData; } }
和ReentrantReadWriteLock不一样的是StampedLock在加锁时都会给你有个戳(stamp),你可以认为这个stamp就是锁的版本号,这个stamp还不能丢,后续解锁时都得用到这个stamp,而这个stamp是用来确认之后锁状态是否有变化的标记,stamp的存在所以这个锁也叫StampedLock。回到上面的Demo。
在readData()中我加了两次锁,在1处首先获取了乐观锁,在2处校验了stamp,如果校验成功说明没有线程在此期间获取并释放过写锁,可以认为数目还没有被更成功更新过(后续有代码详解),否则可以认为有线程更新过数据,所以在3处直接重新获取读锁保证可以读到最新的数据。这里有另外一种写法,如下:
public int readData() { long stamp = stampedLock.tryOptimisticRead(); int curData = this.data; while(!stampedLock.validate(stamp)) { stamp = stampedLock.tryOptimisticRead(); curData = this.data; } return curData; }
这里没有加过readLock(),循环一直尝试用tryOptimisticRead(),直到成功。这种方式优点就是只用乐观锁保证写线程一直都不会被阻塞,但缺点是一直循环可能消耗性能,但在多读少写且写优先级非常高的情况下可以不考虑使用,但大多数情况下第一种方法已经满足性能需求了,所以我还是比较推荐第一种方式。
乐观锁 or 悲观锁
上文中多次提到了乐观锁和悲观锁,这里先科普下乐观锁和悲观锁的区别,对深入理解StampedLock很有帮助。
乐观锁:乐观地认为没有人会更新数据,所以不会独占资源,只是通过检查数据版本来确定数据是否有变化,所以加乐观锁之后并不会阻碍其他线程读写资源(主要是写),乐观锁只是一种概念,并没有实际加锁,所以也不需要显式释放锁。
悲观锁:悲观地认为数我在读数据时可能会有线程更新数据,导致我读到的数据是异常的,所以直接加独占锁,这种情况下会阻碍其他线程访问资源。另外悲观锁需要谨慎使用,否则可能导致发生死锁的情况。
源码分析
我将StampedLock的所有API按其功能划分为几类,见上图。
构造函数
public StampedLock() { state = ORIGIN; }
StampedLock并没有什么参数需要设置,所以构造函数非常简单,但看起来莫名其妙,为什么state直接等于256。不过可以推测StampedLock也是类似于ReentrantReadWriteLock使用一个state的二进制位来标识锁的状态,这里为了方便理解代码,我直接说明它是如何使用二进制位的。
StampedLock用了long型作为state,这里是将其64位划分为3部分使用。低7位作为读锁的标志位,可以由多个线程共享,每有一个线程加了读锁,低7位就加1,那是不是只能有127个读者?当然不是,还有其他机制可以额外记录超过127的读者。第8位是写锁位,由线程独占。其余位是stamp位,记录有没有写锁状态的变化,每使用一次写锁,stamped位就会增加1,相当于整个state加了256。
了解了以上信息,我们就可以大概猜测到StampedLock的运行机制了,接下我们结合代码来验证下我们的猜测。
读锁相关API
乐观读锁的实现
上面已经介绍过了乐观锁的含义了,既然StampedLock的特色就是乐观锁,所以我们先来看下乐观锁的实现。
public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; }
加锁很简单,就是返回了上图中的所有stamp位,没有任何状态的变化,虽然说说是加锁,但其实什么都没做,所以乐观读锁是不需要显式去释放的。乐观锁的使用原理就是只要stamp没有变,就认为数据没有变化,所以在上面的demo中用到了validate(stamp)方法来校验stamp有没有变化,代码也很简单。
public boolean validate(long stamp) { VarHandle.acquireFence(); return (stamp & SBITS) == (state & SBITS); }
那究竟什么情况下会导致stamp位变化?实际上stamp位变化只有一个入口,就是
private static long unlockWriteState(long s) { return ((s += WBIT) == 0L) ? ORIGIN : s; }
这个方法只有两个地方会调用到 tryConvertToReadLock()和unlockWrite(),也就是写锁解除的时候。这个也很好理解,只要有人用过了写锁,就可以简单粗暴地认为数据有更新了。
读锁的获取
public long readLock() { long s, next; // bypass acquireRead on common uncontended case return (whead == wtail // 1 && ((s = state) & ABITS) < RFULL // 2 && casState(s, next = s + RUNIT)) // 3 ? next // 4 : acquireRead(false, 0L); // 5 }
大多数人对于读锁最常用的API肯定就是readLock()了,这个方法保证一定加锁成功,看下他的具体执行步骤。
请求写锁队列为空。
没有加写锁且读锁的数量小于256。
CAS成功更新读锁的状态。
如果1 2 3条件都满足说明读锁加成功了,返回当前的state作为stamp。
否则调用acquireRead()方法获取锁。
acquireRead()中封装好了读锁溢出、自旋、随机探测、阻塞等方法,非常复杂,我们把这个硬骨头放到后面,先来看下tryReadLock()的实现。
public long tryReadLock() { long s, m, next; while ((m = (s = state) & ABITS) != WBIT) { // 1 if (m < RFULL) { // 2 if (casState(s, next = s + RUNIT)) // 3 return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) // 4 return next; } return 0L; // 5 }
这里tryReadLock并不保证加锁成功,使用时要注意。
除非有写锁,否则一直尝试,这里说明写锁的优先级还是高于读锁的。
是否已经加锁126次,如果没有126次说明没加满跳到步骤3,否则得用其他方式记录读锁的状态,跳到步骤4。
cas更新读锁状态成功后返回stamp。
调用tryIncReaderOverflow()在读锁溢出的情况下记录读锁状态。
加锁失败,返回0。
这里有个新方法,tryIncReaderOverflow(),因为在state中读锁只用了7个二进制位,所以最大加锁次数只有127,但实际使用中读线程的数据可能远高于127个,怎么办呢!
private long tryIncReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; if ((s & ABITS) == RFULL) { // 1 if (casState(s, s | RBITS)) { ++readerOverflow; STATE.setVolatile(this, s); return s; } } else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) // 2 Thread.yield(); else // 3 Thread.onSpinWait(); return 0L; }
StampedLock的方式是引入额外的变量readerOverflow来记录多出来的读锁,这里直接将127作为溢出的标识,所以读锁到126的时候已经就算满。读锁满了之后就直接加到readerOverflow,这是专为读锁准备的int型计数,不怕不够用了。 tryIncReaderOverflow中除了正常的加锁外,还有两个奇怪的操作。
如果读锁已经到126个,尝试用cas更新锁状态。
否则以随机的概率主动放弃cpu。
或者CPU自旋等待。
步骤2和3很难理解,仔细想想如果已经调用了tryIncReaderOverflow()表示读锁已经满了,但在这里又看到没满,说明在这期间已经有其他线程放弃了读锁,稍微让线程等待一会,然后重新尝试。
public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { long s, m, next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { if ((m = (s = state) & ABITS) != WBIT) { if (m < RFULL) { if (casState(s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } if (nanos <= 0L) return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireRead(true, deadline)) != INTERRUPTED) return next; } throw new InterruptedException(); }
tryReadLock还可以让你指定获取锁的等待时长,部分代码就是前面tryReadLock无参数的代码,但关于设置等待时长的逻辑最终还是调用了acquireRead(),看来是时候看下acquireRead的具体实现了,因为代码很长。。。。
因为acquireRead里涉及很多排队的机制,为了方便理解代码,我们首先来了解下Stamped排队的机制,如下图,图片来自死磕 java同步系列之StampedLock源码解析。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9pj3sc0Z-1584864591658)(http://note.youdao.com/yws/res/35346/AED4652E8A60421AA677123BCF2F03DC)]
排队过程中并不是说来一个节点就排一个,最终形成一条单链,而是所有的读节点都可以和它前面连续的读合并到一条副链里,因为读锁不是互斥的,n个线程可以同时获取。大家都是来要读锁的好兄弟,来来来我们一起排。
private long acquireRead(boolean interruptible, long deadline) { boolean wasInterrupted = false; WNode node = null, p; for (int spins = -1;;) { // 自旋1 WNode h; if ((h = whead) == (p = wtail)) { // 等待队列为空时 for (long m, s, ns;;) { // 自旋2 if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { // 尝试用cas加读锁,如果超过126次读锁,调用tryIncReaderOverflow来加额外读锁 if (wasInterrupted) Thread.currentThread().interrupt(); return ns; // 加锁成功,直接返回stamp } else if (m >= WBIT) { //上面加锁可能失败,原因可能是加锁时被别人先加了写锁 if (spins > 0) { --spins; Thread.onSpinWait(); // 等待下次加锁的机会 } else { if (spins == 0) { WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np)) // spins为0就意味着要判断是否结束自旋2了 // 等待队列为空结束自旋2 break; } spins = SPINS; // 开启新一轮的自旋1 } } } } // 能到这,说明加锁失败了,线程要准备准备给自己排队了 if (p == null) { // 初始化等待队列,谁第一个排队还得负责帮忙建立排队场所 WNode hd = new WNode(WMODE, null); if (WHEAD.weakCompareAndSet(this, null, hd)) // 自己就是队头了 wtail = hd; } else if (node == null) // 发现有地方可以排队了,快准备给自己占个位 node = new WNode(RMODE, p); else if (h == p || p.mode != RMODE) { // 这里就要考虑自己是不是落单的读请求了 if (node.prev != p) // 如果前面没有一起读的兄弟,自己就先占个位吧 node.prev = p; else if (WTAIL.weakCompareAndSet(this, p, node)) { p.next = node; break; } } else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)) // 如果前面有读的兄弟,尝试加入他的等待队伍,对应到上图的副链. node.cowait = null; else { for (;;) { // 自旋3 和兄弟拼伙的时候可能失败了,多线程导致cas失败,没关系多试几次, WNode pp, c; Thread w; if ((h = whead) != null && (c = h.cowait) != null && WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null) // help release LockSupport.unpark(w); if (Thread.interrupted()) { if (interruptible) return cancelWaiter(node, p, true); wasInterrupted = true; } if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { // 拼伙过程中发现轮到自己了,那就尝试加读锁呗 if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) { if (wasInterrupted) Thread.currentThread().interrupt(); return cancelWaiter(node, p, false); // 如果设置了等待时间,超是就从等待队列自动退出 } Thread wt = Thread.currentThread(); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) { if (time == 0L) LockSupport.park(this); // 还没到自己就挂起一段时间 else LockSupport.parkNanos(this, time); } node.thread = null; } } } } // 上面是尝试加入等待队列时的情况,下面是在等待队列里轮到自己时的情况 for (int spins = -1;;) { // 自旋4 WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { //自旋5 long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? casState(s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { // 加锁尝试 WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (WCOWAIT.compareAndSet(node, c, c.cowait) && (w = c.thread) != null) // 通知和自己一起排队的兄弟们.轮到我们了, 就是调起副链上cowait的所有节点 LockSupport.unpark(w); } if (wasInterrupted) Thread.currentThread().interrupt(); return ns; } else if (m >= WBIT && --k <= 0) break; else Thread.onSpinWait(); } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null) LockSupport.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) WSTATUS.compareAndSet(p, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) { if (time == 0L) LockSupport.park(this); else LockSupport.parkNanos(this, time); } node.thread = null; if (Thread.interrupted()) { if (interruptible) return cancelWaiter(node, node, true); wasInterrupted = true; } } } } }
读锁的获取过程比较艰辛,代码太复杂了,原谅我没有完全看懂,我尽我最大努力在代码上加了一写注释,有兴趣的读者可以自己尝试去理解下,这篇文章还是专注在StampedLock的整体设计上吧。
读锁的释放
锁的释放就很简单了,就是通过cas将锁状态位修改回来,当然首先得校验stamp的合法性,就是将加锁的过程反向操作一遍,比较简单。
public void unlockRead(long stamp) { long s, m; WNode h; while (((s = state) & SBITS) == (stamp & SBITS) && (stamp & RBITS) > 0L && ((m = s & RBITS) > 0L)) { if (m < RFULL) { // 如前文锁说 if (casState(s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); return; } } else if (tryDecReaderOverflow(s) != 0L) return; } throw new IllegalMonitorStateException(); }