StampedLock源码分析(1)

简介: 之前已经说过了ReentrantLock ReentrantReadWriteLock,可以参考之前的博客。在ReentrantReadWriteLock源码解析文末,我提到了ReentrantReadWriteLock的缺点,就是无法避免写线程饥渴的问题,而今天要说的StampedLock提供了乐观读的API,解决了写饥渴的问题。

前言

之前已经说过了ReentrantLock ReentrantReadWriteLock,可以参考之前的博客。在ReentrantReadWriteLock源码解析文末,我提到了ReentrantReadWriteLock的缺点,就是无法避免写线程饥渴的问题,而今天要说的StampedLock提供了乐观读的API,解决了写饥渴的问题。


插播一些内容,我有个同事为了解决写饥饿的问题使用了StampedLock,而并没有用tryOptimisticRead(),单纯以为StampedLock解决了写饥饿的问题,但实际上不用tryOptimisticRead和直接使用ReentrantReadWriteLock没啥区别,都是悲观锁还是会有写饥渴的问题,而且个人感觉代码还会更复杂一些。所以我觉得在说StampedLock的具体实现之前,有必要先来看下StampedLock的正确使用方式。


public class StampedLockDemo {
    private StampedLock stampedLock = new StampedLock();
    private int data = 0;
    public void writeData() {
        long stamp = stampedLock.writeLock();
        try {
            data += 1;
        } finally {
            stampedLock.unlockWrite(stamp);
        }
    }
    public int readData() {
        long stamp = stampedLock.tryOptimisticRead();  // 1
        int curData = this.data;
        if (!stampedLock.validate(stamp)) {   // 2
            try {
                stamp = stampedLock.readLock();  // 3  
                curData = this.data;;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }
        return curData;
    }
}


和ReentrantReadWriteLock不一样的是StampedLock在加锁时都会给你有个戳(stamp),你可以认为这个stamp就是锁的版本号,这个stamp还不能丢,后续解锁时都得用到这个stamp,而这个stamp是用来确认之后锁状态是否有变化的标记,stamp的存在所以这个锁也叫StampedLock。回到上面的Demo。


在readData()中我加了两次锁,在1处首先获取了乐观锁,在2处校验了stamp,如果校验成功说明没有线程在此期间获取并释放过写锁,可以认为数目还没有被更成功更新过(后续有代码详解),否则可以认为有线程更新过数据,所以在3处直接重新获取读锁保证可以读到最新的数据。这里有另外一种写法,如下:


public int readData() {
        long stamp = stampedLock.tryOptimisticRead();
        int curData = this.data;
        while(!stampedLock.validate(stamp)) {
            stamp = stampedLock.tryOptimisticRead();
            curData = this.data;
        }
        return curData;
    }


这里没有加过readLock(),循环一直尝试用tryOptimisticRead(),直到成功。这种方式优点就是只用乐观锁保证写线程一直都不会被阻塞,但缺点是一直循环可能消耗性能,但在多读少写且写优先级非常高的情况下可以不考虑使用,但大多数情况下第一种方法已经满足性能需求了,所以我还是比较推荐第一种方式。


乐观锁 or 悲观锁

上文中多次提到了乐观锁和悲观锁,这里先科普下乐观锁和悲观锁的区别,对深入理解StampedLock很有帮助。


乐观锁:乐观地认为没有人会更新数据,所以不会独占资源,只是通过检查数据版本来确定数据是否有变化,所以加乐观锁之后并不会阻碍其他线程读写资源(主要是写),乐观锁只是一种概念,并没有实际加锁,所以也不需要显式释放锁。

悲观锁:悲观地认为数我在读数据时可能会有线程更新数据,导致我读到的数据是异常的,所以直接加独占锁,这种情况下会阻碍其他线程访问资源。另外悲观锁需要谨慎使用,否则可能导致发生死锁的情况。

源码分析


我将StampedLock的所有API按其功能划分为几类,见上图。


构造函数

 

public StampedLock() {
        state = ORIGIN;
    }


StampedLock并没有什么参数需要设置,所以构造函数非常简单,但看起来莫名其妙,为什么state直接等于256。不过可以推测StampedLock也是类似于ReentrantReadWriteLock使用一个state的二进制位来标识锁的状态,这里为了方便理解代码,我直接说明它是如何使用二进制位的。


StampedLock用了long型作为state,这里是将其64位划分为3部分使用。低7位作为读锁的标志位,可以由多个线程共享,每有一个线程加了读锁,低7位就加1,那是不是只能有127个读者?当然不是,还有其他机制可以额外记录超过127的读者。第8位是写锁位,由线程独占。其余位是stamp位,记录有没有写锁状态的变化,每使用一次写锁,stamped位就会增加1,相当于整个state加了256。


了解了以上信息,我们就可以大概猜测到StampedLock的运行机制了,接下我们结合代码来验证下我们的猜测。


读锁相关API

乐观读锁的实现

上面已经介绍过了乐观锁的含义了,既然StampedLock的特色就是乐观锁,所以我们先来看下乐观锁的实现。


public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }


加锁很简单,就是返回了上图中的所有stamp位,没有任何状态的变化,虽然说说是加锁,但其实什么都没做,所以乐观读锁是不需要显式去释放的。乐观锁的使用原理就是只要stamp没有变,就认为数据没有变化,所以在上面的demo中用到了validate(stamp)方法来校验stamp有没有变化,代码也很简单。


 

public boolean validate(long stamp) {
        VarHandle.acquireFence();
        return (stamp & SBITS) == (state & SBITS);
    }


那究竟什么情况下会导致stamp位变化?实际上stamp位变化只有一个入口,就是


 

private static long unlockWriteState(long s) {
        return ((s += WBIT) == 0L) ? ORIGIN : s;
    }


这个方法只有两个地方会调用到 tryConvertToReadLock()和unlockWrite(),也就是写锁解除的时候。这个也很好理解,只要有人用过了写锁,就可以简单粗暴地认为数据有更新了。


读锁的获取

 

public long readLock() {
        long s, next;
        // bypass acquireRead on common uncontended case
        return (whead == wtail  // 1
                && ((s = state) & ABITS) < RFULL  // 2
                && casState(s, next = s + RUNIT)) // 3
            ? next     // 4
            : acquireRead(false, 0L);  // 5
    }


大多数人对于读锁最常用的API肯定就是readLock()了,这个方法保证一定加锁成功,看下他的具体执行步骤。


请求写锁队列为空。

没有加写锁且读锁的数量小于256。

CAS成功更新读锁的状态。

如果1 2 3条件都满足说明读锁加成功了,返回当前的state作为stamp。

否则调用acquireRead()方法获取锁。

acquireRead()中封装好了读锁溢出、自旋、随机探测、阻塞等方法,非常复杂,我们把这个硬骨头放到后面,先来看下tryReadLock()的实现。


 

public long tryReadLock() {
        long s, m, next;
        while ((m = (s = state) & ABITS) != WBIT) {  // 1 
            if (m < RFULL) {   // 2
                if (casState(s, next = s + RUNIT))  // 3
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)  // 4
                return next;
        }
        return 0L;   // 5
    }


这里tryReadLock并不保证加锁成功,使用时要注意。


除非有写锁,否则一直尝试,这里说明写锁的优先级还是高于读锁的。

是否已经加锁126次,如果没有126次说明没加满跳到步骤3,否则得用其他方式记录读锁的状态,跳到步骤4。

cas更新读锁状态成功后返回stamp。

调用tryIncReaderOverflow()在读锁溢出的情况下记录读锁状态。

加锁失败,返回0。

这里有个新方法,tryIncReaderOverflow(),因为在state中读锁只用了7个二进制位,所以最大加锁次数只有127,但实际使用中读线程的数据可能远高于127个,怎么办呢!

 

private long tryIncReaderOverflow(long s) {
        // assert (s & ABITS) >= RFULL;
        if ((s & ABITS) == RFULL) {  // 1
            if (casState(s, s | RBITS)) {
                ++readerOverflow;
                STATE.setVolatile(this, s);
                return s;
            }
        }
        else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)   // 2
            Thread.yield();
        else  // 3
            Thread.onSpinWait();
        return 0L;
    }


StampedLock的方式是引入额外的变量readerOverflow来记录多出来的读锁,这里直接将127作为溢出的标识,所以读锁到126的时候已经就算满。读锁满了之后就直接加到readerOverflow,这是专为读锁准备的int型计数,不怕不够用了。 tryIncReaderOverflow中除了正常的加锁外,还有两个奇怪的操作。


如果读锁已经到126个,尝试用cas更新锁状态。

否则以随机的概率主动放弃cpu。

或者CPU自旋等待。

步骤2和3很难理解,仔细想想如果已经调用了tryIncReaderOverflow()表示读锁已经满了,但在这里又看到没满,说明在这期间已经有其他线程放弃了读锁,稍微让线程等待一会,然后重新尝试。

 

public long tryReadLock(long time, TimeUnit unit)
        throws InterruptedException {
        long s, m, next, deadline;
        long nanos = unit.toNanos(time);
        if (!Thread.interrupted()) {
            if ((m = (s = state) & ABITS) != WBIT) {
                if (m < RFULL) {
                    if (casState(s, next = s + RUNIT))
                        return next;
                }
                else if ((next = tryIncReaderOverflow(s)) != 0L)
                    return next;
            }
            if (nanos <= 0L)
                return 0L;
            if ((deadline = System.nanoTime() + nanos) == 0L)
                deadline = 1L;
            if ((next = acquireRead(true, deadline)) != INTERRUPTED)
                return next;
        }
        throw new InterruptedException();
    }



tryReadLock还可以让你指定获取锁的等待时长,部分代码就是前面tryReadLock无参数的代码,但关于设置等待时长的逻辑最终还是调用了acquireRead(),看来是时候看下acquireRead的具体实现了,因为代码很长。。。。


因为acquireRead里涉及很多排队的机制,为了方便理解代码,我们首先来了解下Stamped排队的机制,如下图,图片来自死磕 java同步系列之StampedLock源码解析。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9pj3sc0Z-1584864591658)(http://note.youdao.com/yws/res/35346/AED4652E8A60421AA677123BCF2F03DC)]

排队过程中并不是说来一个节点就排一个,最终形成一条单链,而是所有的读节点都可以和它前面连续的读合并到一条副链里,因为读锁不是互斥的,n个线程可以同时获取。大家都是来要读锁的好兄弟,来来来我们一起排。


private long acquireRead(boolean interruptible, long deadline) {
        boolean wasInterrupted = false;
        WNode node = null, p;
        for (int spins = -1;;) {   // 自旋1
            WNode h;
            if ((h = whead) == (p = wtail)) {   // 等待队列为空时
                for (long m, s, ns;;) {  // 自旋2
                    if ((m = (s = state) & ABITS) < RFULL ?
                            casState(s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        // 尝试用cas加读锁,如果超过126次读锁,调用tryIncReaderOverflow来加额外读锁
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return ns;   // 加锁成功,直接返回stamp
                    }
                    else if (m >= WBIT) { //上面加锁可能失败,原因可能是加锁时被别人先加了写锁
                        if (spins > 0) {
                            --spins;
                            Thread.onSpinWait();  // 等待下次加锁的机会
                        }
                        else {
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))  // spins为0就意味着要判断是否结束自旋2了
                                    // 等待队列为空结束自旋2
                                    break;
                            }
                            spins = SPINS; // 开启新一轮的自旋1
                        }
                    }
                }
            }
            // 能到这,说明加锁失败了,线程要准备准备给自己排队了
            if (p == null) { // 初始化等待队列,谁第一个排队还得负责帮忙建立排队场所
                WNode hd = new WNode(WMODE, null);
                if (WHEAD.weakCompareAndSet(this, null, hd))  // 自己就是队头了
                    wtail = hd;
            }
            else if (node == null)  // 发现有地方可以排队了,快准备给自己占个位
                node = new WNode(RMODE, p);
            else if (h == p || p.mode != RMODE) {   // 这里就要考虑自己是不是落单的读请求了
                if (node.prev != p)                 // 如果前面没有一起读的兄弟,自己就先占个位吧
                    node.prev = p;
                else if (WTAIL.weakCompareAndSet(this, p, node)) {
                    p.next = node;
                    break;
                }
            }
            else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node))
                // 如果前面有读的兄弟,尝试加入他的等待队伍,对应到上图的副链.
                node.cowait = null;
            else {
                for (;;) {   // 自旋3  和兄弟拼伙的时候可能失败了,多线程导致cas失败,没关系多试几次,
                    WNode pp, c; Thread w;
                    if ((h = whead) != null && (c = h.cowait) != null &&
                            WCOWAIT.compareAndSet(h, c, c.cowait) &&
                            (w = c.thread) != null) // help release
                        LockSupport.unpark(w);
                    if (Thread.interrupted()) {
                        if (interruptible)
                            return cancelWaiter(node, p, true);
                        wasInterrupted = true;
                    }
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {  // 拼伙过程中发现轮到自己了,那就尝试加读锁呗  
                            if ((m = (s = state) & ABITS) < RFULL ?
                                    casState(s, ns = s + RUNIT) :
                                    (m < WBIT &&
                                            (ns = tryIncReaderOverflow(s)) != 0L)) {
                                if (wasInterrupted)
                                    Thread.currentThread().interrupt();
                                return ns;
                            }
                        } while (m < WBIT);
                    }
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L) {
                            if (wasInterrupted)
                                Thread.currentThread().interrupt();
                            return cancelWaiter(node, p, false);   // 如果设置了等待时间,超是就从等待队列自动退出 
                        }
                        Thread wt = Thread.currentThread();
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                                whead == h && p.prev == pp) {
                            if (time == 0L)
                                LockSupport.park(this);  // 还没到自己就挂起一段时间   
                            else
                                LockSupport.parkNanos(this, time);
                        }
                        node.thread = null;
                    }
                }
            }
        }
        // 上面是尝试加入等待队列时的情况,下面是在等待队列里轮到自己时的情况  
        for (int spins = -1;;) {   // 自旋4
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { //自旋5 
                    long m, s, ns;   
                    if ((m = (s = state) & ABITS) < RFULL ?
                            casState(s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {  // 加锁尝试  
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        while ((c = node.cowait) != null) {
                            if (WCOWAIT.compareAndSet(node, c, c.cowait) &&
                                    (w = c.thread) != null)  // 通知和自己一起排队的兄弟们.轮到我们了, 就是调起副链上cowait的所有节点  
                                LockSupport.unpark(w);
                        }
                        if (wasInterrupted)
                            Thread.currentThread().interrupt();
                        return ns;
                    }
                    else if (m >= WBIT && --k <= 0)
                        break;
                    else
                        Thread.onSpinWait();
                }
            }
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (WCOWAIT.compareAndSet(h, c, c.cowait) &&
                            (w = c.thread) != null)
                        LockSupport.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    WSTATUS.compareAndSet(p, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    node.thread = wt;
                    if (p.status < 0 &&
                            (p != h || (state & ABITS) == WBIT) &&
                            whead == h && node.prev == p) {
                        if (time == 0L)
                            LockSupport.park(this);
                        else
                            LockSupport.parkNanos(this, time);
                    }
                    node.thread = null;
                    if (Thread.interrupted()) {
                        if (interruptible)
                            return cancelWaiter(node, node, true);
                        wasInterrupted = true;
                    }
                }
            }
        }
    }



读锁的获取过程比较艰辛,代码太复杂了,原谅我没有完全看懂,我尽我最大努力在代码上加了一写注释,有兴趣的读者可以自己尝试去理解下,这篇文章还是专注在StampedLock的整体设计上吧。


读锁的释放

锁的释放就很简单了,就是通过cas将锁状态位修改回来,当然首先得校验stamp的合法性,就是将加锁的过程反向操作一遍,比较简单。


public void unlockRead(long stamp) {
        long s, m; WNode h;
        while (((s = state) & SBITS) == (stamp & SBITS)
               && (stamp & RBITS) > 0L
               && ((m = s & RBITS) > 0L)) {
            if (m < RFULL) {   // 如前文锁说
                if (casState(s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                return;
        }
        throw new IllegalMonitorStateException();
    }
目录
相关文章
|
6月前
|
设计模式 Java API
StampedLock源码分析(2)
StampedLock也提供了单独读锁和写锁的封装类WriteLockView和ReadLockView,它俩存在的意义就是只讲锁的部分暴露出去,防止外部接口错误加解锁,我觉得符合软件设计模式中的单一职责和接口隔离原则。
36 1
|
7月前
|
Java
【ReentrantReadWriteLock的实现原理】
【ReentrantReadWriteLock的实现原理】
|
6天前
|
安全 Java
Java并发编程:Synchronized及其实现原理
Java并发编程:Synchronized及其实现原理
25 4
|
6天前
|
存储 Java
StampedLock(戳记锁)源码解读与使用
StampedLock(戳记锁)源码解读与使用
ReentrantReadWriteLock源码分析
ReentrantReadWriteLock源码分析
|
设计模式 Java API
StampedLock源码分析
StampedLock源码分析
71 0
StampedLock源码分析
|
设计模式 Java
JUC并发编程——AQS源码解读
JUC并发编程——AQS源码解读
152 0
JUC并发编程——AQS源码解读
并发编程基础ReentrantLock源码分析
并发编程基础ReentrantLock源码分析
77 0
|
存储 Java
Java并发之AQS源码分析(一)
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
105 0
Java并发之AQS源码分析(一)
|
安全 Java
Java并发之AQS源码分析(二)
我在 Java并发之AQS源码分析(一)这篇文章中,从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,如果你把这部分搞明白了,再看共享锁的实现原理,思路就会清晰很多。下面我们继续从源码中窥探共享锁的实现原理。
126 0
Java并发之AQS源码分析(二)