ReentrantReadWriteLock源码分析

简介: ReentrantReadWriteLock源码分析

构造方法分析


//无参构造方法调用有参构造方法,并且传入一个参数为false
public ReentrantReadWriteLock() {
        this(false);//----->跟进去
    }


//fair为false,表示创建一个非公平的AQS
public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        //创建一个ReadLock,并且把this(ReentrantReadWriteLock)当做参数传入
        readerLock = new ReadLock(this);//----->跟进去
        writerLock = new WriteLock(this);
    }


//参数lock为上面创建的ReentrantReadWriteLock
protected ReadLock(ReentrantReadWriteLock lock) {
            //此时把ReentrantReadWriteLock里的属性sync引用赋值给ReadLock里的属性sync
            sync = lock.sync;
        }


到这其实构造方法就已经看完了,此处想表达的一点是:(ReentrantReadWriteLock里的属性sync)、(reentrantReadWriteLock.readLock的属性sync)和(reentrantReadWriteLock.writeLock的属性sync)是同一个sync(extends AbstractQueuedSynchronizer)


ReentrantReadWriteLock的规定


A获取读锁后,A可以获取读锁


A获取读锁后,A不可以获取写锁


A获取写锁后,A可以获取读锁


A获取写锁后,A可以获取写锁


A获取读锁后,B可以获取读锁


A获取读锁后,B不可以获取写锁


A获取写锁后,B不可以获取读锁


A获取写锁后,B不可以获取写锁


综上:在不考虑重入的情况下,如果A获得读锁,那么其他线程可以获得读锁,但是不能获取写锁(阻塞);如果A获得写锁,那么其他线程读写锁都获取不到(阻塞)


读写高低16位


      AQS 的状态state是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。


1.png


//高低16位,这个就是一常量       
static final int SHARED_SHIFT   = 16;
//上读锁需要在高16位添加1,也就是需要添加一个17位(1后面16个0)的数
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//读锁或者写锁运行最大的上锁数,以写锁为例,最大是16位,那么写锁的最大值为就是15个1
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
//state变量  & 这个值就能获得低16的数值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;


举一个例子


初始化state = 0,其中1-16号代码写锁,17-18号代表读锁


2.png


在state=0的基础之上加一个写锁,其实就是state+1


3.png


在state=0的基础之上加一个读锁,其实就是state+65536(二进制是1后面16个0)


4.png


获取读锁


把state向右移16位,如下图所示,就能获得读锁的数量,


static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }


5.png


获取写锁 ,state  & EXCLUSIVE_MASK 就能获取第16位的值


static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }


6.png


读锁上锁过程


在没有读锁和写锁的情况申请读锁


 ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        //在没有读锁和写锁的情况下申请读锁
        reentrantReadWriteLock.readLock().lock();//----->跟lock方法
        //释放读锁
        reentrantReadWriteLock.readLock().unlock();


public void lock() {
            sync.acquireShared(1);//----->跟进去,参数为1
        }


public final void acquireShared(int arg) {//arg=1
        if (tryAcquireShared(arg) < 0)//----->跟进去,arg=1
            doAcquireShared(arg);
    }


protected final int tryAcquireShared(int unused) {
            //获取当前线程
            Thread current = Thread.currentThread();
            //获取AQS里的属性state,初始化为0
            int c = getState();
            //exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0
            //sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0
                exclusiveCount(c) != 0 
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)getExclusiveOwnerThread()记录上读锁的线程,如果没有读锁,返回null
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1
                if (r == 0) {
                   //略。。。
                } else if (firstReader == current) {
                    //略。。。
                } else {
                    //略。。。
                }
                return 1;//当前情况下最终的返回结果
            }
            return fullTryAcquireShared(current);
        }


public final void acquireShared(int arg) {//arg=1
        if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法
            doAcquireShared(arg);
    }


此时上读锁成功


锁上读锁后再申请读锁的情况


多线程断点调试


new Thread(() -> {
            reentrantReadWriteLock.readLock().lock();
            try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
            System.out.println("t1 logic ");
            reentrantReadWriteLock.readLock().unlock();
        }, "t1").start();
        new Thread(() -> {
            reentrantReadWriteLock.readLock().lock();
            try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
            System.out.println("t2 logic ");
            reentrantReadWriteLock.readLock().unlock();
        }, "t2").start();


        public void lock() {
            sync.acquireShared(1);//----->跟进去,参数为1
        }


public final void acquireShared(int arg) { 
        if (tryAcquireShared(arg) < 0)//----->跟进去,arg = 1
            doAcquireShared(arg);
    }


 protected final int tryAcquireShared(int unused) {
            //获取当前线程
            Thread current = Thread.currentThread();
            //我们的假设的是原来有一把读锁,那么c=state=65536
            int c = getState();
            //exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0
            //sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1
                if (r == 0) {
                    //略。。。
                } else if (firstReader == current) {
                    //略。。。
                } else {
                    //略。。。
                }
                return 1;//最后返回的结果
            }
            return fullTryAcquireShared(current);
        }


public final void acquireShared(int arg) {//arg=1
        if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法
            doAcquireShared(arg);
    }


此时上读锁成功


锁上写锁后再申请读锁的情况


多线程断点调试


1.        new Thread(() -> {
            reentrantReadWriteLock.writeLock().lock();
            try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
            System.out.println("t1 logic ");
            reentrantReadWriteLock.writeLock().unlock();
        }, "t1").start();
        new Thread(() -> {
            reentrantReadWriteLock.readLock().lock();//----->跟进去,lock方法
            try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
            System.out.println("t2 logic ");
            reentrantReadWriteLock.readLock().unlock();
        }, "t2").start();


        public void lock() {
            sync.acquireShared(1);----->跟进去,参数为1
        }


    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)----->跟进去,arg=1
            doAcquireShared(arg);
    }


protected final int tryAcquireShared(int unused) {
            //获得当前线程
            Thread current = Thread.currentThread();
            //获得state的值
            int c = getState();
            if (exclusiveCount(c) != 0 //假设条件为有读锁,所以exclusiveCount(c) != 0 为真
                &&
                getExclusiveOwnerThread() != current) //AQS会记录申请成功的读锁的线程,如果申请写锁成功的线程再次申请读锁(可重入),getExclusiveOwnerThread() != current  为假, 否则为真,此处为真。
                return -1;//最后的返回结果
            //略
        }


    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//----->跳出来,tryAcquireShared(arg)=-1
            doAcquireShared(arg);//----->跟进去,arg=1
    }


//把当前线程封装到node节点中,并且插入到AQS的链表尾,再一次尝试获取锁,如果获取成功,把当前节点设置为头节点;否则就告诉当线程挂起。
private void doAcquireShared(int arg) {
        //给AQS里的链表添加节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获得当前节点的前驱节点
                final Node p = node.predecessor();
                if (p == head) {//如果是头节点
                    //尝试获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//如果获取成功
                        setHeadAndPropagate(node, r);//设置当前节点为头节点,并且唤醒后继的读节点
                        p.next = null; 
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;//方法的唯一出口,但是你要明白中间是当前线程是有挂起的
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node)//设置node的prev节点的一个参数,告诉prev需要唤醒node
                    &&
                    parkAndCheckInterrupt())//调用LockSupport.park(thred)挂起线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


现在为止,其实就该线程就已经挂起了(并且把当前线程存到node节点中),因为前面有写锁,所以不能获取锁。等写锁释放后就会调用LockSupport.unpark()继续运行上面代码的线程。


读锁解锁过程


            reentrantReadWriteLock.readLock().lock();
            try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
            System.out.println("t1 logic ");
            reentrantReadWriteLock.readLock().unlock();----->跟进去.unlock()方法


        public void unlock() {
            sync.releaseShared(1);//----->跟进去,参数为1
        }


 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//----->跟进去,参数为1
            doReleaseShared();
            return true;
        }
        return false;
    }


protected final boolean tryReleaseShared(int unused) {
            //获取当前线程
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                //略。。。
            } else {
                //略。。。
            }
            for (;;) {
                //获得state的数据
                int c = getState();
                //释放读锁就是在高16位减1,即c-65536
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))//CAS方式设置state的新值
                    //nextc即state的值,state==0,表明无读锁和无写锁
                    return nextc == 0;
            }
        }


 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//----->跳出来,如果state==0,执行下面方法运行其它线程加锁,否则就不执行
            doReleaseShared();//如果state==0,唤醒AQS里链表里的后继节点
            return true;
        }
        return false;
    }


此时读锁释放锁成功,其实逻辑很简单,用CAS算法给state的高16位减1,如果state==0,唤醒后继节点,否则什么也不做。


写锁上锁过程


此处就像上面的写的那样了,就简单介绍几个方法


public final void acquire(int arg) {//arg=1
        if (!tryAcquire(arg) //尝试获取锁,如果获取成功,返回true;!tryAcquire(arg)返回false
            &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//只有当tryAcquire(arg)返回false,j即申请锁失败才执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
            selfInterrupt();
    }


protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            Thread current = Thread.currentThread();
            //获得state的值
            int c = getState();
            //获取写锁的数量
            int w = exclusiveCount(c);
            if (c != 0) {//说明有读锁或者写锁,此时不能申请写锁
                // 如果写锁为0(说明有读锁)||写锁不为0(说明有写锁),getExclusiveOwnerThread()返回null或者申请写锁成功的线程
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 走到这说明写锁重入
                setState(c + acquires);
                return true;
            }
            //如果c==0,说明既没有写锁也没有读锁
            if (writerShouldBlock() //返回false,写死的
                ||
                !compareAndSetState(c, c + acquires))//用CAS设置state的状态
                return false;
            setExclusiveOwnerThread(current);//设置申请成功写锁的线程
            return true;
        }


private Node addWaiter(Node mode) {
        //把当前线程封装到node节点中
        Node node = new Node(Thread.currentThread(), mode);
        //并且把node设置为尾结点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//走到这说明tail为空,说明链表为空,需要初始化链表并且插入当前node节点
        return node;
    }


private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize 
                //第一遍循环走if,把head设置为new Node())
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //第二遍循环走else,node设置为尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


写锁解锁过程


此处就像上面的写的那样了,就简单介绍几个方法


public final boolean release(int arg) {//arg=1
        if (tryRelease(arg)) {//释放锁,如果当前写锁释放成功,并且没有重入写锁,说明state的高16位为0,可以唤醒后继节点
            Node h = head;
            if (h != null && h.waitStatus != 0)//如果符合条件
                unparkSuccessor(h);//唤醒h的里后继节点
            return true;
        }
        return false;
    }


  protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())//如果记录的申请写锁的线程和释放写锁的线程不一致,抛异常
                throw new IllegalMonitorStateException();
            //释放写锁就是减一
            int nextc = getState() - releases;
            //如果此时写锁为0,free返回true,否则返回false
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);//没有写锁了就把申请写锁成功的线程设置为null
            //更新state的数值
            setState(nextc);
            return free;
        }


总结


0)本文写的真烂(自评)


1)核心类AbstractQueuedSynchronizer  (AQS)


2)高低16位的真正含义


3)看源码要跟源码,而不是傻傻的看


4)向这种要多线程运行看,读读申请写,读申请写,写申请读,写申请写等等


5)可重入锁要考虑


6)理解AQS里的链表节点的添加删除是一个核心中的核心


7)释放写锁成功后会唤醒后继节点;申请读锁成功后会唤醒后继节点(如果后继节点是读线程)


目录
相关文章
|
设计模式 Java API
StampedLock源码分析(2)
StampedLock也提供了单独读锁和写锁的封装类WriteLockView和ReadLockView,它俩存在的意义就是只讲锁的部分暴露出去,防止外部接口错误加解锁,我觉得符合软件设计模式中的单一职责和接口隔离原则。
68 1
【ReentrantReadWriteLock的实现原理】
【ReentrantReadWriteLock的实现原理】
|
Java API
StampedLock源码分析(1)
之前已经说过了ReentrantLock ReentrantReadWriteLock,可以参考之前的博客。在ReentrantReadWriteLock源码解析文末,我提到了ReentrantReadWriteLock的缺点,就是无法避免写线程饥渴的问题,而今天要说的StampedLock提供了乐观读的API,解决了写饥渴的问题。
63 0
|
5月前
|
存储 Java 开发者
synchronized源码分析解读
该文章主要探讨了Java中synchronized关键字的工作原理及其相关的源码分析,概括了synchronized关键字的基本概念、特性和其实现机制。通过源码分析进一步揭示了synchronized背后的运作原理。
AQS(abstractQueuedSynchronizer)锁实现原理详解
AQS(abstractQueuedSynchronizer)抽象队列同步器。其本身是一个抽象类,提供lock锁的实现。聚合大量的锁机制实现的共用方法。
156 0
|
设计模式 Java API
StampedLock源码分析
StampedLock源码分析
90 0
StampedLock源码分析
|
安全 Java
Java并发之AQS源码分析(二)
我在 Java并发之AQS源码分析(一)这篇文章中,从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,如果你把这部分搞明白了,再看共享锁的实现原理,思路就会清晰很多。下面我们继续从源码中窥探共享锁的实现原理。
156 0
Java并发之AQS源码分析(二)
|
存储 Java
Java并发之AQS源码分析(一)
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
127 0
Java并发之AQS源码分析(一)
|
缓存
【JUC】JDK1.8源码分析之ReentrantReadWriteLock(七)
在分析了锁框架的其他类之后,下面进入锁框架中最后一个类ReentrantReadWriteLock的分析,它表示可重入读写锁,ReentrantReadWriteLock中包含了两种锁,读锁ReadLock和写锁WriteLock,可以通过这两种锁实现线程间的同步,下面开始进行分析。
117 0
【JUC】JDK1.8源码分析之ReentrantReadWriteLock(七)