【从入门到放弃-Java】并发编程-JUC-locks-ReentrantReadWriteLock

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。

前言

上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。
接下来我们一起来学习下ReentrantReadWriteLock。

ReentrantReadWriteLock

/**
 * Creates a new {@code ReentrantReadWriteLock} with
  * default (nonfair) ordering properties.
  */
 public ReentrantReadWriteLock() {
     this(false);
 }
 
 /**
  * Creates a new {@code ReentrantReadWriteLock} with
  * the given fairness policy.
  *
  * @param fair {@code true} if this lock should use a fair ordering policy
  */
 public ReentrantReadWriteLock(boolean fair) {
     sync = fair ? new FairSync() : new NonfairSync();
     readerLock = new ReadLock(this);
     writerLock = new WriteLock(this);
 }

我们可以看到和ReentrantLock一样,ReentrantReadWriteLock也使用了通过AQS实现的FairSync和NonfairSync模式
有两个成员变量锁ReadLock和WriteLock

ReadLock::lock

获取读锁,不死不休

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    //如果已经有写锁,且不是当前线程持有的,则加读锁失败
    //如果当前线程已经持有写锁,则可以获取读锁,这就是锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    /** 
     * 判断读线程是否阻塞,取决于队列的策略
     *   公平锁策略:如果当前同步队列不为空且当前线程不是队列的第一个节点,则阻塞。
     *   非公平锁策略:如果当前队列的第一个节点时写锁,则需要阻塞。这样是为了防止写锁饥饿。
     * 如果不需要阻塞,且读锁数未达到最大值 则尝试通过cas的方式获取锁
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //如果当前读锁为0,则当前线程获取锁
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如过第一个读锁的持有者是当前线程,则firstReaderHoldCount数量加一
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            //如果最后一个获取锁的线程不是当前线程
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                //获取当前线程的锁
                cachedHoldCounter = rh = readHolds.get();
            //如果当前最后一个线程获取锁数量为0,则将其设置为当前线程的holdcounter
            else if (rh.count == 0)
                readHolds.set(rh);
            //读锁数+1
            rh.count++;
        }
        return 1;
    }
    //尝试无限循环获取读锁
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //如果已经有写锁,且不是当前线程持有的,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        //如果需要阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        //如果当前线程持有的锁数为0,则移除
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取锁成功后,将当前线程从队列头结点移除
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

ReadLock::lockInterruptibly

获取读锁,直到成功或被中断

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //如果收到中断信号,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果尝试获取锁失败,则循环等待获取锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            //获取锁失败的话则需要进行中断检测,检测到中断信号则抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::tryLock

//尝试获取读锁,如果有写锁获取失败,则直接返回失败
public boolean tryLock() {
    return sync.tryReadLock();
}

@ReservedStackAccess
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

//尝试获取读锁,获取失败或者超时未获取到的话,则返回失败
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //排到当前线程的话则尝试获取锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //超时返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            
            //阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            //如果被中断
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::unlock

释放锁

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个持有读锁的
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //如果是唯一一个持有读锁的,则firstReader设置为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        //firstReaderHoldCount减一,
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        //如果不是最后一个持有读锁的线程
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            //从ThreadLocal获取readHolds
            rh = readHolds.get();
        int count = rh.count;
        //如果小于等于1,则移除readHolds
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //持有锁的数量减一
        --rh.count;
    }
    for (;;) {
        //将state设置为0,原因是在写锁降级为读锁后,释放读锁时,需要将state设为0,方便后续的写锁竞争。
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //如果头结点不是null,并且队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果当前结点是SIGNAL信号
            if (ws == Node.SIGNAL) {
                //唤醒头结点
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

WriteLock::lock

获取写锁,如果获取失败,则加入等待队列
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

public void lock() {
    sync.acquire(1);
}

WriteLock::lockInterruptibly

获取写锁,如果获取失败,则加入等待队列,直到获取到或被中断
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

WriteLock::tryLock

public boolean tryLock() {
    return sync.tryWriteLock();
}

@ReservedStackAccess
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    //如果存在写锁,且写锁不是当前线程持有的,则返回false
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //如果不存在写锁或是当前线程获取的写锁,则尝试将state加一
    if (!compareAndSetState(c, c + 1))
        return false;
    //设置持有写锁的线程为当前线程
    setExclusiveOwnerThread(current);
    return true;
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //和ReentrantLock的调用方法一样,不再赘述
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

WriteLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //如果不是当前线程持有的写锁,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //判断持有的写锁是否释放完毕
    boolean free = exclusiveCount(nextc) == 0;
    //如果释放完毕,则将当前持有锁的线程设置为null
    if (free)
        setExclusiveOwnerThread(null);
    //设置持有的锁数量减一
    setState(nextc);
    return free;
}

总结

通过源码分析,我们了解到,可以通过ReentrantReadWriteLock可以获取读锁和写锁。

  • 写锁是互斥锁,只能一个线程持有,写锁和ReentrantLock类似
  • 读锁是共享锁,可以多个线程同时持有。
  • 读锁通过firstReader和cachedHoldCounter优化获取、释放锁的性能。使用ThreadLocal readHolds存放所有持有锁线程的tid和持有锁数量。
  • 线程可以将自己持有的写锁降级为读锁,在释放读锁时,一起释放。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
34 0
|
3月前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第17天】本文详细介绍了Java编程中Map的使用,涵盖Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的并发处理和性能优化技巧,适合初学者和进阶者学习。
89 3
|
9天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
81 60
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
67 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
192 6
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
2月前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
51 2
|
2月前
|
存储 安全 Java
🌟Java零基础-反序列化:从入门到精通
【10月更文挑战第21天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
93 5