Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析

简介: 我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。

我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。

 

首先我们先看一下,ReentrantReadWriteLock 内部构造先看下它的类图结构如下图所示:

如上图可以看到读写锁内部维护了一个ReadLock和WriteLock,并且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁的实现,我们知道AQS里面维护了一个state状态,

而ReentrantReadWriteLock 则需要维护读状态和写状态,一个state是无法表示写和读状态的。ReentrantReadWriteLock 巧妙的使用 state 的高 16 位表示读状态,

也就是获取该读锁的线程个数,低 16 位 表示获取到写锁的线程的可重入次数。并通过CAS对其进行操作实现了读写分离,在读多写少的场景下比较适用。

接下来用一张图来加深对 ReentrantReadWriteLock 的理解:

 

 

首先我们先看ReentrantReadWriteLock 的内部类Sync的一些关键属性和方法,源码如下:

static final int SHARED_SHIFT   = 16;

//共享锁(读锁)状态单位值65536 
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

//排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//用来记录最后一个获取读锁的线程获取读锁的可重入次数
private transient HoldCounter cachedHoldCounter;
//用来记录第一个获取到读锁的线程
private transient Thread firstReader;
//用来记录第一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReadHoldCount;
//用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数
private transient ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter();
/** 返回读锁线程数 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回写锁可重入个数 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

类图中 firstReader用来记录第一个获取到读锁的线程,firstReadHoldCount则记录第一个获取到读锁的线程获取读锁的可重入数。cachedHoldCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数。

接下我们进入ReentrantReadWriteLock 的内部类Sync的内部类HoldCounter类的源码,如下:

static final class HoldCounter {
       int count = 0;
       //线程id
       final long tid = getThreadId(Thread.currentThread());
   }

 

readHolds 是ThreadLocal 变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数,可知ThreadLocalHoldCounter继承了ThreadLocal,里面initialValue方法返回一个HoldCounter对象,源码如下:

  static final class ThreadLocalHoldCounter
       extends ThreadLocal<HoldCounter> {
       public HoldCounter initialValue() {
           return new HoldCounter();
       }
   }

 

接下来进行写锁的获取与释放讲解,如下:

ReentrantReadWriteLock 中写锁是使用的 WriteLock 来实现的。我们先看一下写锁WriteLock的获取与释放方法,如下:

  1.void lock() 写锁是个独占锁,同时只有一个线程可以获取该锁。 如果当前没有线程获取到读锁和写锁则当前线程可以获取到写锁然后返回。 如果当前已经有线程取到读锁和写锁则当前线程则当前请求写锁线程会被阻塞挂起。

另外写锁是可重入锁,如果当前线程已经获取了该锁,再次获取的只是简单的把可重入次数加一后直接返回。源码如下:

  public void lock() {
       sync.acquire(1);
   }
   public final void acquire(int arg) {
        // sync重写的tryAcquire方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如上代码,lock()内部调用了AQS的acquire方法,其中的tryAcquire是ReentrantReadWriteLock 内部 sync 类重写的,代码如下:

  protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            //(1) c!=0说明读锁或者写锁已经被某线程获取
            if (c != 0) {
                (2//w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
               (3//说明某线程获取了写锁,判断可重入个数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");

               (4// 设置可重入数量(1)
                setState(c + acquires);
                return true;
            }

           (5//第一个写线程获取写锁
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
   }

如上代码(1),如果当AQS状态值不为0 则说明当前已经有线程获取到了读锁或者写锁,代码(2)如果w == 0 说明状态值的低 16 为0,而状态值不为0,则说明高16位不为0,这暗示已经有线程获取了读锁,所以直接返回false。

如果w != 0 说明当前已经有线程获取了该写锁,则看当前线程是不是该锁的持有者,如果不是则返回false。

执行到代码(3)说明当前线程之前获取到了该锁,则判断该线程的可重入此时是不是超过了最大值,是则抛异常,否则执行代码(4)增加当前线程的可重入次数,然后返回true。

如果AQS的状态值等于0,则说明目前没有线程获取到读锁和写锁,则实行代码(5),

其中对于ReentrantReadWriteLock的子类NofairSync的writerShouldBlock方法的非公平锁的实现源码如下:

final boolean writerShouldBlock() {
       return false; // writers can always barge
   }

如代码对于非公平锁来说固定返回false,则说明代码(5)抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程返回true,否则返回false。

 

对于对于ReentrantReadWriteLock的子类FairSync的writerShouldBlock方法的公平锁的实现源码如下:

final boolean writerShouldBlock() {
  return hasQueuedPredecessors();
}

可知还是使用 hasQueuedPredecessors 来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限直接返回 false。

 

  2.void lockInterruptibly() 类似 lock() 方法,不同在于该方法对中断响应,也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException,源码如下:

    public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
       }

 

  3.boolean tryLock() 尝试获取写锁,如果当前没有其它线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回 true。 如果当前已经其它线程持有写锁或者读锁则该方法直接返回 false,当前线程并不会被阻塞。

如果当前线程已经持有了该写锁则简单增加 AQS 的状态值后直接返回 true。源码如下:

  public boolean tryLock( ) {
       return sync.tryWriteLock();
   }
  final boolean tryWriteLock() {
       Thread current = Thread.currentThread();
       int c = getState();
       if (c != 0) {
           int w = exclusiveCount(c);
           if (w == 0 || current != getExclusiveOwnerThread())
               return false;
           if (w == MAX_COUNT)
               throw new Error("Maximum lock count exceeded");
       }
       if (!compareAndSetState(c, c + 1))
           return false;
       setExclusiveOwnerThread(current);
       return true;
   }

如上代码与tryAcquire 方法类似,这里不再讲述,不同在于这里使用的非公平策略

 

  4.boolean tryLock(long timeout, TimeUnit unit) 与 tryAcquire()不同在于多了超时时间的参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回 false。

另外该方法对中断响应, 也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException。源码如下:

public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

 

  5.void unlock() 尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS 状态值减一,如果减去 1 后当前状态值为 0 则当前线程会释放对该锁的持有,否者仅仅减一而已。

如果当前线程没有持有该锁调用了该方法则会抛出 IllegalMonitorStateException 异常 ,源码如下:

public void unlock() {
    sync.release(1);
}
  public final boolean release(int arg) {
        //调用ReentrantReadWriteLock中sync实现的tryRelease方法
        if (tryRelease(arg)) {
            //激活阻塞队列里面的一个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
           //(6) 看是否是写锁拥有者调用的unlock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
           //(7)获取可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
           //(8)如果写锁可重入值为0则释放锁,否者只是简单更新状态值。
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
      }

如上代码 tryRelease 首先通过 isHeldExcusively判断是否当前线程是该写锁的持有者,如果不是则抛异常,否则执行代码(7)说明当前线程持有写锁,持有写锁说明状态值的高16位为0,所以这里nextc值就是当前线程写锁的剩余可重入次数。

代码(8)判断当前可重入次数是否为0,如果free为true 说明可重入次数为0,则当前线程会释放对写锁的持有,当前锁的持有者设置为null。如果free 为false,则简单更新可重入次数。

  

 

前面讲解了写锁的获取与释放,接下来讲解读锁的获取与释放,如下:

ReentrantReadWriteLock 中写锁是使用的 ReadLock 来实现的。主要看ReadLock读锁的获取与释放的主要方法,如下:

  1.void lock() 获取读锁,如果当前没有其它线程持有写锁,则当前线程可以获取读锁,AQS 的高 16 位的值会增加 1,然后方法返回。否者如果其它有一个线程持有写锁,则当前线程会被阻塞。源码如下:

public void lock() {
   sync.acquireShared(1);
}
  public final void acquireShared(int arg) {
        //调用ReentrantReadWriteLock中的sync的tryAcquireShared方法
        if (tryAcquireShared(arg) < 0)
           //调用AQS的doAcquireShared方法
            doAcquireShared(arg);
    }

如上代码读锁的lock方法调用了AQS的aquireShared方法,内部调用了 ReentrantReadWriteLock 中的 sync 重写的 tryAcquireShared 方法,源码如下:

protected final int tryAcquireShared(int unused) {

   //(1)获取当前状态值
    Thread current = Thread.currentThread();
    int c = getState();

    //(2)判断是否写锁被占用
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

    //(3)获取读锁计数
    int r = sharedCount(c);
    //(4)尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //(5)第一个线程获取读锁
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //(6)如果当前线程是第一个获取读锁的线程
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //(7)记录最后一个获取读锁的线程或记录其它线程读锁的可重入数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //(8)类似tryAcquireShared,但是是自旋获取
    return fullTryAcquireShared(current);
}

如上代码,首先获取了当前AQS的状态值,然后代码(2)看是否有其他线程获取到了写锁,如果是则直接返回了-1,然后调用AQS的doAcquireShared 方法把当前线程放入阻塞队列。

否则执行到代码(3)得到获取到读锁的线程个数,到这里要说明目前没有线程获取到写锁,但是还是有可能有线程持有读锁,然后执行代码(4),非公平锁的readerShouldBlock实现代码如下:

final boolean readerShouldBlock() {
     return apparentlyFirstQueuedIsExclusive();
}
 final boolean apparentlyFirstQueuedIsExclusive() {
   Node h, s;
   return (h = head) != null && (s = h.next)  != null && !s.isShared() && s.thread != null;
  }

如上代码作用是如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是的话,则当前县城使用判断当前获取读锁线程是否达到了最大值,最后执行CAS操作设置AQS状态值的高 16 位值增加 1。

代码(5)(6)记录第一个获取读锁的线程,并统计该线程获取读锁的可重入次数,代码(7)使用cachedHoldCounter 记录最后一个获取到读锁的线程,并同时该线程获取读锁的可重入次数,另外readHolds记录了当前线程获取读锁的可重入次数。

如果readerShouldBlock 返回 true 则说明有线程正在获取写锁,则执行代码(8)fullTryAcquireShared 代码与 tryAcquireShared 类似,不同在于前者是通过循环自旋获取。

源码如下:

    final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    ////
                } else if (readerShouldBlock()) {
                    // 
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // 
                    }
                    return 1;
                }
            }
        }

 

  2.void lockInterruptibly() 类似 lock() 方法,不同在于该方法对中断响应,也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException。

 

  3.boolean tryLock() 尝试获取读锁,如果当前没有其它线程持有写锁,则当前线程获取写锁会成功,然后返回 true。如果当前已经其它线程持有写锁则该方法直接返回 false,当前线程并不会被阻塞。

如果其它获取当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。代码类似 tryLock 这里不再讲述。

 

  4.boolean tryLock(long timeout, TimeUnit unit) 与 tryLock()不同在于多了超时时间的参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到读锁则返回 false。

另外该方法对中断响应, 也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException

 

  5.void unlock() 释放锁。源码如下:

public void unlock() {
   sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个获取读锁线程
    if (firstReader == current) {
        //如果可重入次数为1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else//否者可重入次数减去1
            firstReaderHoldCount--;
    } else {
        //如果当前线程不是最后一个获取读锁线程,则从threadlocal里面获取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        //如果可重入次数<=1则清除threadlocal
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //可重入次数减去一
        --rh.count;
    }

    //循环直到自己的读计数-1 cas更新成功
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))

            return nextc == 0;
    }
}

 

 

好了,到目前为止,我们知道了上一篇笔记中,使用ReentrantLock 实现的线程安全的 list, 但是由于 ReentrantLock 是独占锁所以在读多写少的情况下性能很差,下面使用 ReentrantReadWriteLock 来改造为如下代码:

package com.hjc;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created by cong on 2018/6/14.
 */
public class ReentrantReadWriteLockTest {
    //线程不安全的list
    private ArrayList<String> array = new ArrayList<String>();
    //独占锁
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    //添加元素
    public void add(String e) {

        writeLock.lock();
        try {
            array.add(e);

        } finally {
            writeLock.unlock();

        }
    }
    //删元素
    public void remove(String e) {

        writeLock.lock();
        try {
            array.remove(e);

        } finally {
            writeLock.unlock();

        }
    }

    //获取数据
    public String get(int index) {

        readLock.lock();
        try {
            return array.get(index);

        } finally {
            readLock.unlock();

        }
    }
}

如代码调用 get 方法适合使用的是读锁,这样运行多个读线程同时访问 list 的元素,在读多写少的情况下性能相比 ReentrantLock 会很好。

目录
相关文章
|
18天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
22天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
61 12
|
19天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
105 2
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
Java
Java多线程进一步的理解------------实现读写锁
public class ReadAndWriteLock { public static void main(String[] args) { final QueueJ q = new Queu...
1428 0
|
6天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
44 17
|
16天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
2天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
18天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
18天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。