快进来!花几分钟看一下 ReentrantReadWriteLock 的原理!

简介: 在看完 ReentrantLock 之后,在高并发场景下 ReentrantLock 已经足够使用,但是因为 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而很多应用场景都是读多写少,这时候使用 ReentrantLock 就不太合适了。读多写少的场景该如何使用?在 JUC 包下同样提供了读写锁 ReentrantReadWriteLock 来应对读多写少的场景。


前言


在看完 ReentrantLock 之后,在高并发场景下 ReentrantLock 已经足够使用,但是因为 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而很多应用场景都是读多写少,这时候使用 ReentrantLock 就不太合适了。读多写少的场景该如何使用?在 JUC 包下同样提供了读写锁 ReentrantReadWriteLock 来应对读多写少的场景。


介绍


支持类似 ReentrantLock 语义的 ReadWriteLock 的实现。

具有以下属性:

  • 获取顺序

此类不会将读取优先或写入优先强加给锁访问的排序。但是,它确实支持可选的公平 策略。

支持公平模式非公平模式,默认为非公平模式

  • 重入

允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读锁或写锁。在写线程释放持有的所有写锁后,reader 才允许重入使用它们。此外,writer 可以获取读锁,但反过来则不成立。

  • 锁降级

重入还允许从写锁降级为读锁,通过先获取写锁,然后获取读锁,最后释放写锁的方式降级。但是,从读锁升级到写锁是不可能的

  • 锁获取的中断

读锁和写锁都支持锁获取期间的中断。

  • Condition 支持

写锁提供了一个 Condition 实现,对于写锁来说,该实现的方式与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写锁。读锁不支持 Condition

  • 监测

此类支持一些确定是保持锁还是争用锁的方法。这些方法设计用于监视系统状态,而不是同步控制。

锁最多支持 65535 个递归写锁和 65535 个读锁

以上为 Java Api 官方文档[1] 的解释,总结一下内容如下:

  1. 支持非公平和公平模式,默认为非公平模式。
  2. 支持重入,读锁可以重入获取读锁,写锁可以重入获取写锁,写锁可以获取读锁,读锁不可以获取写锁。
  3. 锁可以降级,从写锁降级为读锁,但是不可能从读锁升级到写锁。


基本使用

class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
        // 读锁加锁
        rwl.readLock().lock();
        if (!cacheValid) {
            // 获取写锁之前必须释放读锁
            rwl.readLock().unlock();
            // 写锁加锁
            rwl.writeLock().lock();
            try {
                // 重新检查状态,因为另一个线程可能
                // 在执行操作之前获取了写锁定并更改了状态
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // 通过在释放写锁之前获取读锁来降级
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

上面只是官方文档提供的一个 demo。


问题疑问

  1. 在 ReentrantReadWriteLock 中 state 代表什么?
  2. 线程获取锁的流程是怎么样的?
  3. 读锁和写锁的可重入性是如何实现的?
  4. 当前线程获取锁失败,被阻塞的后续操作是什么?
  5. 锁降级是怎么降级的?


源码分析


代码结构

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 提供读锁的内部类 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 提供写锁的内部类 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 执行所有同步机制 */
    final Sync sync;
}


state

之前在阅读 ReentrantLock 源码的时候 state 代表了锁的状态,0 表示没有线程持有锁,大于 1 表示已经有线程持有锁及其重入的次数。而在 ReentrantReadWriteLock 是读写锁,那就需要保存读锁写锁两种状态的,那是怎么样表示的呢?

在 ReentrantReadWriteLock 中同样存在一个 Sync 继承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父类。内部定义了 state 的一些操作。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
    // 移位数
    static final int SHARED_SHIFT   = 16;
    // 单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大数量 1 << 16 -> 65536
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 计算独占数使用 1 << 16 -> 65536
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // 返回共享保留数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 返回独占保留数 
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

在 AQS 中定义 state 为 int 类型,而在 ReentrantReadWriteLock 中,将 state 的 高 16 位和低 16 位拆开表示读写锁。其中高 16 位表示读锁,低 16 位表示写锁。分别使用 sharedCount 和 exclusiveCount 方法获取读锁和写锁的当前状态。

下面分别从读锁和写锁的角度来看如何进行加锁和释放锁的?


ReadLock.lock

public static class ReadLock 
    implements Lock, java.io.Serializable {
    /**
     * 获取读取锁。
     * 如果写锁没有被另一个线程持有,则获取读锁并立即返回。
     * 如果写锁由另一个线程持有,则出于线程调度目的,
     * 当前线程将被禁用,并处于休眠状态,直到获取读锁为止。
     */
    public void lock() {
        // 调用 AQS 获取共享资源
        sync.acquireShared(1);
    }
}

获取共享资源,这块使用的 AQS 的逻辑,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中实现的。并且 AQS 中有规定,tryAcquireShared 分为三种返回值:

  1. 小于 0: 表示失败;
  2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
  3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。
abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        // 获取 state 值
        int c = getState();
        // 独占计数不为 0 且 不是当前线程, 说明已经有写锁
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1;
        // 获取共享计数(读锁计数)
        int r = sharedCount(c);
        // 不需要阻塞读锁 && 共享计数小于最大值 && state 更新成功
        if (!readerShouldBlock() && r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                // 当前读锁计数为 0
                // firstReader是获得读锁的第一个线程
                // firstReaderHoldCount是firstReader的保持计数
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 读锁重入
                firstReaderHoldCount++;
            } else {
                // 当前缓存计数
                HoldCounter rh = cachedHoldCounter;
                // 当前线程没有计数 或者 没有创建计数器
                if (rh == null || rh.tid != getThreadId(current))
                    // 创建计数,基于 ThreadLocal
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) 
                    readHolds.set(rh);
                // 计数累加
                rh.count++;
            }
            return 1;
        }
        // 完整地获取共享锁方法,作为tryAcquireShared方法因CAS获取锁失败后的处理。
        // 因为前面可能失败 CAS 失败, 队列策略失败等原因。
        return fullTryAcquireShared(current);
    }
}
  1. 先获取 state ,通过 exclusiveCount 方法获取到写锁的计数值,不为 0 且 不是当前线程, 说明已经有写锁。返回 -1 失败。
  2. 通过 sharedCount 获取读锁计数,判断是否需要阻塞以及是否超过上限后,使用 CAS 更新 读锁计数。
  3. 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。
  4. 最后会进行完整的获取共享锁方法,作为之前获取失败的后续处理方法。

firstReader:firstReader是获得读锁的第一个线程; firstReaderHoldCount:firstReaderHoldCount是firstReader的保持计数。即获得读锁的第一个线程的重入次数。 cachedHoldCounter:最后一个获得读锁的线程获得读锁的重入次数。

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 无限循环
    for (;;) {
        int c = getState();
        // 是否有写锁
        if (exclusiveCount(c) != 0) {
            // 有写锁,但是不是当前线程,直接返回失败
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 需要阻塞
            // 没有写锁,确保没有重新获取读锁
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 当前线程的读锁计数 ThreadLocal 中
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        // 计数结束,remove 掉
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 为 0 直接失败
                if (rh.count == 0)
                    return -1;
            }
        }
        // 到达上限 抛出异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS 设置读锁
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
复制代码
  1. 首先会一直循环
  2. 有写锁,但是不是当前线程,直接返回失败。但是,有写锁,如果是当前线程,是会继续执行的。
  3. 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。

当存在写锁(独占锁)时,方法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 方法,循环获取资源。doAcquireShared 方法会不断循环,尝试获取读锁,一旦获取到读锁,当前节点会立即唤醒后续节点,后续节点开始尝试获取读锁,依次传播。


ReadLock.unlock

public static class ReadLock 
    implements Lock, java.io.Serializable {
    public void unlock() {
        sync.releaseShared(1);
    }
}


调用 AQS 的 releaseShared 释放共享资源方法。


其中 tryReleaseShared 有 ReadLock 实现。

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 第一个线程是当前线程
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 第一个线程不是当前线程,更新自己的 ThreadLocal 里面的计数
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 循环
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // 使用 CAS 更新 state
        if (compareAndSetState(c, nextc))
            // 但是如果现在读和写锁都已释放,
            // 它可能允许等待的写程序继续进行。
            return nextc == 0;
    }
}
  1. 如果是第一个线程,直接更新技术,不是则更新自己 ThreadLocal 里面保存的计数。
  2. 循环,使用 CAS 更新 state 的值。
  3. 如果 state 更新后的值为 0,说明没有线程持有读锁或者写锁了。
  4. 当 state 为 0,此时会调用 AQS 的 doReleaseShared 方法。此时队列如果有写锁,那就会被写锁获取的锁。


WriteLock.lock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    /**
     * 获取写入锁。
     * 如果没有其他线程持有读锁或写锁,会直接返回,并将写锁计数设置为1。
     * 如果当前线程持有写锁,则将写锁计数 +1,然后返回。
     * 如果锁正在被其他线程持有,则当前线程用于线程调度目的,
     * 当前线程将被禁用,并处于休眠状态,直到获取读锁并将写锁计数设置为1。
     */
    public void lock() {
        sync.acquire(1);
    }
}


tryAcquire 方法由 Write 自己实现,方式和 ReentrantLock 类似。

protected final boolean tryAcquire(int acquires) {
    // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。
    // 如果计数饱和,则失败。只有在count不为零时,才可能发生这种情况。
    // 否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。
    // 如果是这样,请更新状态并设置所有者。
    Thread current = Thread.currentThread();
    int c = getState();
    // 写锁计数
    int w = exclusiveCount(c);
    // c != 0 说明有有线程获取锁了
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 判断是不是自己,不是自己 返回 false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 判断有没有超过上限
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 重入
        setState(c + acquires);
        return true;
    }
    // 不需要阻塞,或者 CAS 更新 state 失败
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  1. 获取 state , 如果 state 不为 0 则判断是否为当前线程重入获取。
  2. state 为 0 ,则当前线程 CAS 更新 state,获取锁。
  3. 更新成功之后绑定当前线程。
  4. 如果失败会继续调用 AQS 的 acquireQueued,将当前阻塞放在 AQS 队列中。AQS 会不断循环,等待上一个锁释放后,尝试获得锁。


WriteLock.unlock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    // 如果当前线程是此锁的持有者,则保持计数递减。 
    // 如果保持现在的计数为零,则解除锁定。 
    // 如果当前线程不是此锁的持有者则IllegalMonitorStateException异常。
    public void unlock() {
        sync.release(1);
    }
}


同样这块代码是使用 AQS 的逻辑,tryRelease 部分由 WriteLock 自己实现。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  1. 如果是当前线程重入,扣减重入次数。
  2. 扣减后如果为 0,则设置锁持有线程为 null,更新 state 值。AQS 会唤醒后续节点获取锁。


总结


问题

Q: 在 ReentrantReadWriteLock 中 state 代表什么?

A: state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。


Q: 线程获取锁的流程是怎么样的?

A: 可以参考上面的源码笔记,以及后面的流程图。


Q: 读锁和写锁的可重入性是如何实现的?

A: 在加锁的时候,判断是否为当前线程,如果是当前线程,则直接累加计数。值得注意的是:读锁重入计数使用的 ThreadLocal 在线程中缓存计数,而写锁则直接用的 state 进行累加(其实和 state 低 16 位进行累加一样)。


Q: 当前线程获取锁失败,被阻塞的后续操作是什么?

A: 获取失败,会放到 AQS 等待队列中,在队列中不断循环,监视前一个节点是否为 head ,是的话,会重新尝试获取锁。


Q: 锁降级是怎么降级的?

A: 如图,在圈出部分 fullTryAcquireShared 代码中,可以看出来,在获取读锁的时候,如果当前线程持有写锁,是可以获取读锁的。这块就是指锁降级,比如线程 A 获取到了写锁,当线程 A 执行完毕时,它需要获取当前数据,假设不支持锁降级,就会导致 A 释放写锁,然后再次请求读锁。而在这中间是有可能被其他阻塞的线程获取到写锁的。从而导致线程 A 在一次执行过程中数据不一致。


小结


  1. ReentrantReadWriteLock 读写锁,内部实现是 ReadLock 读锁 和 WriteLock 写锁。读锁,允许共享;写锁,是独占锁。
  2. 读写锁都支持重入,读锁的重入次数记录在线程维护的 ThreadLocal 中,写锁维护在 state 上(低 16 位)。
  3. 支持锁降级,从写锁降级为读锁,防止脏读。
  4. ReadLock 和 WriteLock 都是通过 AQS 来实现的。获取锁失败后会放到 AQS 等待队列中,后续不断尝试获取锁。区别在读锁只有存在写锁的时候才放到等待队列,而写锁是只要存在非当前线程锁(无论写锁还是读锁)都会放到等待队列。!
  5. 通过源码分析,可以得出读写锁适合在读多写少的场景中使用。
目录
相关文章
太难了!面试官居然要我停止一个正在运行的线程?
停止一个线程意味着在任务处理完任务之前停掉正在做的操作,也就是放弃当前的操作。停止一个线程可以用Thread.stop()方法,但最好不要用它。虽然它确实可以停止一个正在运行的线程,但是这个方法是不安全的,而且是已被废弃的方法。
|
消息中间件 JavaScript 小程序
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
新来个阿里 P7,仅花 2 小时,撸出一个多线程永动任务,看完直接跪了,真牛逼!
|
算法 uml
一文简单全面了解策略模式的使用【花几分钟轻松掌握一个知识点】
您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦。 本文重点:介绍策略模式概念以及实际应用。 干货满满,建议收藏,需要用到时常看看。小伙伴们如有问题及需要,欢迎踊跃留言哦~ ~ ~
166 0
一文简单全面了解策略模式的使用【花几分钟轻松掌握一个知识点】
|
监控 Java 程序员
十分钟掌握java多线程进阶
十分钟掌握java多线程进阶
144 0
十分钟掌握java多线程进阶
|
算法 Java Linux
工作这么久了,还不懂多线程吗?
浩哥Java多线程整理学习系列之01基础知识整理
115 0
工作这么久了,还不懂多线程吗?
|
存储 安全 Java
小白也能看懂的锁升级过程和锁状态
小白也能看懂的锁升级过程和锁状态
276 0
小白也能看懂的锁升级过程和锁状态
|
监控 算法 Java
花了好几个晚上整理的JVM知识点,吐血献出(一)(下)
花了好几个晚上整理的JVM知识点,吐血献出(一)(下)
161 0
花了好几个晚上整理的JVM知识点,吐血献出(一)(下)
|
存储 缓存 安全
花了好几个晚上整理的JVM知识点,吐血献出(一)(上)
花了好几个晚上整理的JVM知识点,吐血献出(一)
130 0
花了好几个晚上整理的JVM知识点,吐血献出(一)(上)
J3
|
存储 安全 Java
synchronized解析及锁膨胀过程,面试再也不怕了
synchronized解析及锁膨胀过程,面试再也不怕了
J3
555 0
synchronized解析及锁膨胀过程,面试再也不怕了
|
Java Go C#
面试官没想到,一个 Java 线程生命周期,我可以扯半小时
面试官:你不是精通 Java 并发吗?从基础的 Java 线程生命周期开始讲讲吧。 好的,面试官。吧啦啦啦... 如果要说 Java 线程的生命周期的话,那我觉得就要先说说操作系统的线程生命周期 因为 JVM 是跑在操作系统上面的嘛,所以是绕不过去的,而且可以说, Java 语言中的线程本质上就是操作系统的线程
面试官没想到,一个 Java 线程生命周期,我可以扯半小时