【从入门到放弃-Java】并发编程-JUC-locks-ReentrantReadWriteLock

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。

前言

上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。
接下来我们一起来学习下ReentrantReadWriteLock。

ReentrantReadWriteLock

/**
 * Creates a new {@code ReentrantReadWriteLock} with
  * default (nonfair) ordering properties.
  */
 public ReentrantReadWriteLock() {
     this(false);
 }
 
 /**
  * Creates a new {@code ReentrantReadWriteLock} with
  * the given fairness policy.
  *
  * @param fair {@code true} if this lock should use a fair ordering policy
  */
 public ReentrantReadWriteLock(boolean fair) {
     sync = fair ? new FairSync() : new NonfairSync();
     readerLock = new ReadLock(this);
     writerLock = new WriteLock(this);
 }

我们可以看到和ReentrantLock一样,ReentrantReadWriteLock也使用了通过AQS实现的FairSync和NonfairSync模式
有两个成员变量锁ReadLock和WriteLock

ReadLock::lock

获取读锁,不死不休

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    //如果已经有写锁,且不是当前线程持有的,则加读锁失败
    //如果当前线程已经持有写锁,则可以获取读锁,这就是锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    /** 
     * 判断读线程是否阻塞,取决于队列的策略
     *   公平锁策略:如果当前同步队列不为空且当前线程不是队列的第一个节点,则阻塞。
     *   非公平锁策略:如果当前队列的第一个节点时写锁,则需要阻塞。这样是为了防止写锁饥饿。
     * 如果不需要阻塞,且读锁数未达到最大值 则尝试通过cas的方式获取锁
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //如果当前读锁为0,则当前线程获取锁
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如过第一个读锁的持有者是当前线程,则firstReaderHoldCount数量加一
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            //如果最后一个获取锁的线程不是当前线程
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                //获取当前线程的锁
                cachedHoldCounter = rh = readHolds.get();
            //如果当前最后一个线程获取锁数量为0,则将其设置为当前线程的holdcounter
            else if (rh.count == 0)
                readHolds.set(rh);
            //读锁数+1
            rh.count++;
        }
        return 1;
    }
    //尝试无限循环获取读锁
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //如果已经有写锁,且不是当前线程持有的,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        //如果需要阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        //如果当前线程持有的锁数为0,则移除
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取锁成功后,将当前线程从队列头结点移除
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

ReadLock::lockInterruptibly

获取读锁,直到成功或被中断

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //如果收到中断信号,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果尝试获取锁失败,则循环等待获取锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            //获取锁失败的话则需要进行中断检测,检测到中断信号则抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::tryLock

//尝试获取读锁,如果有写锁获取失败,则直接返回失败
public boolean tryLock() {
    return sync.tryReadLock();
}

@ReservedStackAccess
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

//尝试获取读锁,获取失败或者超时未获取到的话,则返回失败
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //排到当前线程的话则尝试获取锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //超时返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            
            //阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            //如果被中断
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::unlock

释放锁

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个持有读锁的
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //如果是唯一一个持有读锁的,则firstReader设置为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        //firstReaderHoldCount减一,
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        //如果不是最后一个持有读锁的线程
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            //从ThreadLocal获取readHolds
            rh = readHolds.get();
        int count = rh.count;
        //如果小于等于1,则移除readHolds
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //持有锁的数量减一
        --rh.count;
    }
    for (;;) {
        //将state设置为0,原因是在写锁降级为读锁后,释放读锁时,需要将state设为0,方便后续的写锁竞争。
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //如果头结点不是null,并且队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果当前结点是SIGNAL信号
            if (ws == Node.SIGNAL) {
                //唤醒头结点
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

WriteLock::lock

获取写锁,如果获取失败,则加入等待队列
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

public void lock() {
    sync.acquire(1);
}

WriteLock::lockInterruptibly

获取写锁,如果获取失败,则加入等待队列,直到获取到或被中断
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

WriteLock::tryLock

public boolean tryLock() {
    return sync.tryWriteLock();
}

@ReservedStackAccess
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    //如果存在写锁,且写锁不是当前线程持有的,则返回false
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //如果不存在写锁或是当前线程获取的写锁,则尝试将state加一
    if (!compareAndSetState(c, c + 1))
        return false;
    //设置持有写锁的线程为当前线程
    setExclusiveOwnerThread(current);
    return true;
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //和ReentrantLock的调用方法一样,不再赘述
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

WriteLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //如果不是当前线程持有的写锁,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //判断持有的写锁是否释放完毕
    boolean free = exclusiveCount(nextc) == 0;
    //如果释放完毕,则将当前持有锁的线程设置为null
    if (free)
        setExclusiveOwnerThread(null);
    //设置持有的锁数量减一
    setState(nextc);
    return free;
}

总结

通过源码分析,我们了解到,可以通过ReentrantReadWriteLock可以获取读锁和写锁。

  • 写锁是互斥锁,只能一个线程持有,写锁和ReentrantLock类似
  • 读锁是共享锁,可以多个线程同时持有。
  • 读锁通过firstReader和cachedHoldCounter优化获取、释放锁的性能。使用ThreadLocal readHolds存放所有持有锁线程的tid和持有锁数量。
  • 线程可以将自己持有的写锁降级为读锁,在释放读锁时,一起释放。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
5天前
|
安全 Java 调度
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第12天】 在现代软件开发中,多线程编程是提升应用程序性能和响应能力的关键手段之一。特别是在Java语言中,由于其内置的跨平台线程支持,开发者可以轻松地创建和管理线程。然而,随之而来的并发问题也不容小觑。本文将探讨Java并发编程的核心概念,包括线程安全策略、锁机制以及性能优化技巧。通过实例分析与性能比较,我们旨在为读者提供一套既确保线程安全又兼顾性能的编程指导。
|
5天前
|
数据采集 安全 Java
Java并发编程学习12-任务取消(上)
【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容
30 4
Java并发编程学习12-任务取消(上)
|
1天前
|
Java
Java一分钟之-并发编程:线程间通信(Phaser, CyclicBarrier, Semaphore)
【5月更文挑战第19天】Java并发编程中,Phaser、CyclicBarrier和Semaphore是三种强大的同步工具。Phaser用于阶段性任务协调,支持动态注册;CyclicBarrier允许线程同步执行,适合循环任务;Semaphore控制资源访问线程数,常用于限流和资源池管理。了解其使用场景、常见问题及避免策略,结合代码示例,能有效提升并发程序效率。注意异常处理和资源管理,以防止并发问题。
21 2
|
1天前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
15 2
|
2天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
12 5
|
2天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
2天前
|
安全 Java
Java一分钟之-并发编程:原子类(AtomicInteger, AtomicReference)
【5月更文挑战第18天】Java并发编程中的原子类如`AtomicInteger`和`AtomicReference`提供无锁原子操作,适用于高性能并发场景。`AtomicInteger`支持原子整数操作,而`AtomicReference`允许原子更新对象引用。常见问题包括误解原子性、过度依赖原子类以及忽略对象内部状态的并发控制。要避免这些问题,需明确原子操作边界,合理选择同步策略,并精确控制原子更新。示例代码展示了如何使用这两个类。正确理解和使用原子类是构建高效并发程序的关键。
12 1
|
2天前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
9 1
|
2天前
|
Java 编译器
Java并发编程中的锁优化策略
【5月更文挑战第18天】在Java并发编程中,锁是一种常用的同步机制,用于保护共享资源的访问。然而,不当的锁使用可能导致性能问题和死锁风险。本文将探讨Java中锁的优化策略,包括锁粗化、锁消除、锁分离和读写锁等技术,以提高并发程序的性能和可靠性。
|
3天前
|
Java 编译器
Java 并发编程中的锁优化策略
【5月更文挑战第17天】在 Java 并发编程中,锁是一种常见的同步机制,用于保护共享资源的访问。然而,不当使用锁可能导致性能问题和死锁风险。本文将探讨 Java 中的锁优化策略,包括锁粗化、锁消除、锁降级以及读写锁等技术,以提高并发程序的性能和可靠性。