构造方法分析
//无参构造方法调用有参构造方法,并且传入一个参数为false public ReentrantReadWriteLock() { this(false);//----->跟进去 }
//fair为false,表示创建一个非公平的AQS public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); //创建一个ReadLock,并且把this(ReentrantReadWriteLock)当做参数传入 readerLock = new ReadLock(this);//----->跟进去 writerLock = new WriteLock(this); }
//参数lock为上面创建的ReentrantReadWriteLock protected ReadLock(ReentrantReadWriteLock lock) { //此时把ReentrantReadWriteLock里的属性sync引用赋值给ReadLock里的属性sync sync = lock.sync; }
到这其实构造方法就已经看完了,此处想表达的一点是:(ReentrantReadWriteLock里的属性sync)、(reentrantReadWriteLock.readLock的属性sync)和(reentrantReadWriteLock.writeLock的属性sync)是同一个sync(extends AbstractQueuedSynchronizer)
ReentrantReadWriteLock的规定
A获取读锁后,A可以获取读锁
A获取读锁后,A不可以获取写锁
A获取写锁后,A可以获取读锁
A获取写锁后,A可以获取写锁
A获取读锁后,B可以获取读锁
A获取读锁后,B不可以获取写锁
A获取写锁后,B不可以获取读锁
A获取写锁后,B不可以获取写锁
综上:在不考虑重入的情况下,如果A获得读锁,那么其他线程可以获得读锁,但是不能获取写锁(阻塞);如果A获得写锁,那么其他线程读写锁都获取不到(阻塞)
读写高低16位
AQS 的状态state是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。
//高低16位,这个就是一常量 static final int SHARED_SHIFT = 16; //上读锁需要在高16位添加1,也就是需要添加一个17位(1后面16个0)的数 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //读锁或者写锁运行最大的上锁数,以写锁为例,最大是16位,那么写锁的最大值为就是15个1 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //state变量 & 这个值就能获得低16的数值 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
举一个例子
初始化state = 0,其中1-16号代码写锁,17-18号代表读锁
在state=0的基础之上加一个写锁,其实就是state+1
在state=0的基础之上加一个读锁,其实就是state+65536(二进制是1后面16个0)
获取读锁
把state向右移16位,如下图所示,就能获得读锁的数量,
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
获取写锁 ,state & EXCLUSIVE_MASK 就能获取第16位的值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读锁上锁过程
在没有读锁和写锁的情况申请读锁
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); //在没有读锁和写锁的情况下申请读锁 reentrantReadWriteLock.readLock().lock();//----->跟lock方法 //释放读锁 reentrantReadWriteLock.readLock().unlock();
public void lock() { sync.acquireShared(1);//----->跟进去,参数为1 }
public final void acquireShared(int arg) {//arg=1 if (tryAcquireShared(arg) < 0)//----->跟进去,arg=1 doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //获取AQS里的属性state,初始化为0 int c = getState(); //exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0 //sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0 exclusiveCount(c) != 0 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)getExclusiveOwnerThread()记录上读锁的线程,如果没有读锁,返回null return -1; int r = sharedCount(c); if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1 if (r == 0) { //略。。。 } else if (firstReader == current) { //略。。。 } else { //略。。。 } return 1;//当前情况下最终的返回结果 } return fullTryAcquireShared(current); }
public final void acquireShared(int arg) {//arg=1 if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法 doAcquireShared(arg); }
此时上读锁成功
锁上读锁后再申请读锁的情况
多线程断点调试
new Thread(() -> { reentrantReadWriteLock.readLock().lock(); try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } System.out.println("t1 logic "); reentrantReadWriteLock.readLock().unlock(); }, "t1").start(); new Thread(() -> { reentrantReadWriteLock.readLock().lock(); try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } System.out.println("t2 logic "); reentrantReadWriteLock.readLock().unlock(); }, "t2").start();
public void lock() { sync.acquireShared(1);//----->跟进去,参数为1 }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//----->跟进去,arg = 1 doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); //我们的假设的是原来有一把读锁,那么c=state=65536 int c = getState(); //exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0 //sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1 if (r == 0) { //略。。。 } else if (firstReader == current) { //略。。。 } else { //略。。。 } return 1;//最后返回的结果 } return fullTryAcquireShared(current); }
public final void acquireShared(int arg) {//arg=1 if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法 doAcquireShared(arg); }
此时上读锁成功
锁上写锁后再申请读锁的情况
多线程断点调试
1. new Thread(() -> { reentrantReadWriteLock.writeLock().lock(); try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } System.out.println("t1 logic "); reentrantReadWriteLock.writeLock().unlock(); }, "t1").start(); new Thread(() -> { reentrantReadWriteLock.readLock().lock();//----->跟进去,lock方法 try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } System.out.println("t2 logic "); reentrantReadWriteLock.readLock().unlock(); }, "t2").start();
public void lock() { sync.acquireShared(1);----->跟进去,参数为1 }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)----->跟进去,arg=1 doAcquireShared(arg); }
protected final int tryAcquireShared(int unused) { //获得当前线程 Thread current = Thread.currentThread(); //获得state的值 int c = getState(); if (exclusiveCount(c) != 0 //假设条件为有读锁,所以exclusiveCount(c) != 0 为真 && getExclusiveOwnerThread() != current) //AQS会记录申请成功的读锁的线程,如果申请写锁成功的线程再次申请读锁(可重入),getExclusiveOwnerThread() != current 为假, 否则为真,此处为真。 return -1;//最后的返回结果 //略 }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//----->跳出来,tryAcquireShared(arg)=-1 doAcquireShared(arg);//----->跟进去,arg=1 }
//把当前线程封装到node节点中,并且插入到AQS的链表尾,再一次尝试获取锁,如果获取成功,把当前节点设置为头节点;否则就告诉当线程挂起。 private void doAcquireShared(int arg) { //给AQS里的链表添加节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //获得当前节点的前驱节点 final Node p = node.predecessor(); if (p == head) {//如果是头节点 //尝试获取锁 int r = tryAcquireShared(arg); if (r >= 0) {//如果获取成功 setHeadAndPropagate(node, r);//设置当前节点为头节点,并且唤醒后继的读节点 p.next = null; if (interrupted) selfInterrupt(); failed = false; return;//方法的唯一出口,但是你要明白中间是当前线程是有挂起的 } } if (shouldParkAfterFailedAcquire(p, node)//设置node的prev节点的一个参数,告诉prev需要唤醒node && parkAndCheckInterrupt())//调用LockSupport.park(thred)挂起线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
现在为止,其实就该线程就已经挂起了(并且把当前线程存到node节点中),因为前面有写锁,所以不能获取锁。等写锁释放后就会调用LockSupport.unpark()继续运行上面代码的线程。
读锁解锁过程
reentrantReadWriteLock.readLock().lock(); try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { } System.out.println("t1 logic "); reentrantReadWriteLock.readLock().unlock();----->跟进去.unlock()方法
public void unlock() { sync.releaseShared(1);//----->跟进去,参数为1 }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//----->跟进去,参数为1 doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int unused) { //获取当前线程 Thread current = Thread.currentThread(); if (firstReader == current) { //略。。。 } else { //略。。。 } for (;;) { //获得state的数据 int c = getState(); //释放读锁就是在高16位减1,即c-65536 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc))//CAS方式设置state的新值 //nextc即state的值,state==0,表明无读锁和无写锁 return nextc == 0; } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//----->跳出来,如果state==0,执行下面方法运行其它线程加锁,否则就不执行 doReleaseShared();//如果state==0,唤醒AQS里链表里的后继节点 return true; } return false; }
此时读锁释放锁成功,其实逻辑很简单,用CAS算法给state的高16位减1,如果state==0,唤醒后继节点,否则什么也不做。
写锁上锁过程
此处就像上面的写的那样了,就简单介绍几个方法
public final void acquire(int arg) {//arg=1 if (!tryAcquire(arg) //尝试获取锁,如果获取成功,返回true;!tryAcquire(arg)返回false && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//只有当tryAcquire(arg)返回false,j即申请锁失败才执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法 selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { //获取当前线程 Thread current = Thread.currentThread(); //获得state的值 int c = getState(); //获取写锁的数量 int w = exclusiveCount(c); if (c != 0) {//说明有读锁或者写锁,此时不能申请写锁 // 如果写锁为0(说明有读锁)||写锁不为0(说明有写锁),getExclusiveOwnerThread()返回null或者申请写锁成功的线程 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 走到这说明写锁重入 setState(c + acquires); return true; } //如果c==0,说明既没有写锁也没有读锁 if (writerShouldBlock() //返回false,写死的 || !compareAndSetState(c, c + acquires))//用CAS设置state的状态 return false; setExclusiveOwnerThread(current);//设置申请成功写锁的线程 return true; }
private Node addWaiter(Node mode) { //把当前线程封装到node节点中 Node node = new Node(Thread.currentThread(), mode); //并且把node设置为尾结点 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//走到这说明tail为空,说明链表为空,需要初始化链表并且插入当前node节点 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //第一遍循环走if,把head设置为new Node()) if (compareAndSetHead(new Node())) tail = head; } else { //第二遍循环走else,node设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
写锁解锁过程
此处就像上面的写的那样了,就简单介绍几个方法
public final boolean release(int arg) {//arg=1 if (tryRelease(arg)) {//释放锁,如果当前写锁释放成功,并且没有重入写锁,说明state的高16位为0,可以唤醒后继节点 Node h = head; if (h != null && h.waitStatus != 0)//如果符合条件 unparkSuccessor(h);//唤醒h的里后继节点 return true; } return false; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//如果记录的申请写锁的线程和释放写锁的线程不一致,抛异常 throw new IllegalMonitorStateException(); //释放写锁就是减一 int nextc = getState() - releases; //如果此时写锁为0,free返回true,否则返回false boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null);//没有写锁了就把申请写锁成功的线程设置为null //更新state的数值 setState(nextc); return free; }
总结
0)本文写的真烂(自评)
1)核心类AbstractQueuedSynchronizer (AQS)
2)高低16位的真正含义
3)看源码要跟源码,而不是傻傻的看
4)向这种要多线程运行看,读读申请写,读申请写,写申请读,写申请写等等
5)可重入锁要考虑
6)理解AQS里的链表节点的添加删除是一个核心中的核心
7)释放写锁成功后会唤醒后继节点;申请读锁成功后会唤醒后继节点(如果后继节点是读线程)