2. 从ReentrantLock到AQS
没看过AQS,ReentrantLock 总该了解点吧,有道是知兄莫如弟,那么我们就由其入手,旁敲侧击。
简述
描述一下ReentrantLock的背景:
我们都知道 synchronized
关键字是用于加锁,但是这种锁对于性能影响比较大,因为线程在获取资源时必须处于等待状态,没有额外的尝试机制。所以在jdk1.5 的时候,java 提供了 ReentrantLock ,用于替代 synchronized.
ReentrantLoack 具有可重入,可中断,可限时,公平锁非公平锁等特点。
ReentrantLock 的简单使用:
val lock = ReentrantLock() lock.lock() //加锁 //业务逻辑 lock.unlock() //释放
与AQS的关系
看到这,你可能会说,你说了那么多,那它到底和 AQS
有啥关系?请看下图所示:
ReentrantLock 内部有一个抽象内部类 Sync 继承了 AbstractQueuedSynchronizer ,默认构造函数中又实例化了 NonfairSync 类 (Sync 的子类)。对于外部而言,只关注 lock 与 unlock 方法,但实际上内部都是调用了 AbstractQueuedSynchronizer 的方法。
流程剖析
看了上面的图,只是为加深一个印象,那就是 ReentrantLock 中用到了 AQS ,接下来我们通过下面这个简单流程解析,来看一下 AQS 在 ReentrantLock 的运用以及其原理。
为了便于理解,我们整个流程都是以 NonfairSync 即不公平锁的伪源码为例(公平非公平差距并不大)。
lock()
从加锁方法开始,如下:
class NonfairSync .. -> fun lock() -> { 👉 1. if (compareAndSetState(0, 1)) 👉 2. setExclusiveOwnerThread(Thread.currentThread()) else 👇 acquire(1) //独占模式获取 fun acquire(arg:Int=1) -> { //尝试获取 && 将当前线程添加到等待队列中,并通过CAS的方式不断尝试获取前一个节点 👉 3. if (!tryAcquire(arg) && 👉 4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg) )
1.compareAndSetState
这个方法的意思是尝试获取锁,其内部的操作如下:如果当前状态值等于期望值,则以原子方式将同步状态设置为给定的更新值。
也就是说,当前我们预估值为0,即我们预估当前没有线程占用资源,如果操作时,发现 这个要实际操作的值真的是0,也就是当前资源并没有其他线程占用,那么我们就将其更新为1,表示当前资源已经被占用。
而 AQS 内部正是有一个 int 型变量 state ,其作用正是代表当前加锁状态。
private volatile int state;
当线程尝试获取锁成功后,如果同一个线程再次尝试获取锁呢?我们称之为锁的重入,那怎么做呢?总不能我自己再获取一把锁?不可能吧,对于一个资源,怎么可能生成两把锁被同一个线程占用。离谱!那怎么办呢?
这时候就轮到 setExclusiveOwnerThread 方法了,我们看看它的实现。
2.setExclusiveOwnerThread
protected final void setExclusiveOwnerThread(Thread thread) - { exclusiveOwnerThread = thread
内部是设置了当前的线程对象,而这个 exclusiveOwnerThread
正是 AQS
另一个变量,代表了 当前拥有锁的线程 。这个在哪里用呢,我们看下面方法。
3.tryAcquire
这个方法的含义是以不公平的方式去获取锁,其伪代码如下:
fun tryAcquire(acquires:Int=1) -> { 👇 fun nonfairTryAcquire(acquires):Boolean -> { val current = 当前线程对象 val c = getState() 1. 👉 if(c== 0 && compareAndSetState(0, acquires)) setExclusiveOwnerThread(current) return true 2. 👉 else if (current= AQS中当前占用资源的线程对象) AQS中持有的state += acquires return true return false
当调用 nonfairTryAcquire 获取锁时,内部的操作很简单:首先获取当前的线程对象与 当前 AQS
中储存的 state
状态值,
1.如果当前state=0 并且 通过 compareAndSetState 方法尝试修改 state 成功 则代表当前资源没有线程占用,然后就设置当前拥有锁的线程为当前自己。
2.如果 当前占用资源的线程是自己,那么对 AQS 中的 state+1 ,然后返回true,即代表当前线程获取锁成功。
如果 return false,则代表当前线程获取锁失败。
为什么这里当获取锁的时候是同一个线程就要 state+1
呢?
我们都知道,使用 ReentrantLock
时,我们释放锁调用的是 unLock ,那么我们的切入点就在这了。
4.acquireQueued
这个方法是以死循环的方式不断获取锁,内部代码如下:
fun acquireQueued(node:Node, arg:Int=1):Boolean -> { //当前是否成功拿到资源 var failed = true try { //是否在等待过程中被中断过 val interrupted = false //自旋开始,要么获取锁, while (true) { val p = 前一个node节点 //如果前一个节点是head节点并且修改state成功,则表明当前线程已获得锁 if (p == head && tryAcquire(arg)) { //设置新的头结点 setHead(node) //将之前的头结点置null,便于GC回收 p.next = null // help GC failed = false return interrupted } //获取锁失败时调用 //如果通过前驱结点判断发现当前线程被阻塞并且当前线程已经被中断,则修改 interrupted 标记 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true } } finally { if (failed) cancelAcquire(node) } }
总结
当我们调用 lock() 方式加锁时(以不公平锁为例),内部先是以原子方式去尝试修改 AQS 中持有的 state 变量:
1.如果修改成功,则代表当前资源无线程占用,则获取锁成功,并且将当前 AQS 中的 exclusiveOwnerThread 更新为当前线程对象,以便于后期锁重入时直接返回获取锁成功。
2.如果最开始修改失败,则调用 acquire 方法去获取锁。 其方法会内部 尝试获取锁一次 ,并且将当前线程添加到等待队列中,然后通过CAS的方式不断自旋,一直获取 父node 节点, 如果 父node 节点是 head头节点 ,就说明当前节点在 队列首部 ,就尝试获取锁,如果获取成功,则更新队列头节点为当前node,并移除当前遍历的父节点。如果获取锁失败,则通过前驱节点判断当前线程是否被阻塞,如果当前线程已经被中断,则更新标记位,并且暂停此循环,等待唤醒。
看完了 lock() 方法的简单分析,是不是觉得感觉自己上错了车,上面只是简单做了一个流程分析,如果细追下去,其中的细节还很深,可能就不是本文所能全部概述,我们只需要知道大体流程即可。
unLock()
1.👇 fun unLock() -> { 2.👇 fun release(arg:Int=1):Boolean -> { // 如果 -tryRelease- 结果为true,则唤醒正在等待的队列,即让其他线程获取锁。 👉 if (tryRelease(arg)) ... unparkSuccessor(h) 3.👇 fun tryRelease(arg=1):Boolean -> { c = AQS中state变量 - arg .. if (c == 0) { //设置AQS中持有的线程为null setExclusiveOwnerThread(null) return true } .. setState(c) ..
如上所述伪代码, **unLock ** 方法调用顺序如下,在调用 unLock 方法进行释放锁时,内部其实调用了 relase 方法,其内部又调用了 tryRelease 方法,其内部先是使用 AQS 中的 state 变量-arg(1) ,如果当 c=0 ,则表明当前已经没有线程占用资源,则去唤醒正在等待中的队列,也就是让其他线程开始获取锁。
串一遍思路(非公平锁)
当我们调用 lock 方法时,先是尝试以原子的方式去修改 AQS 内部的state变量值,如果当前 state 值与预期值一致,则更新 AQS 内部state 的变量值为 1 ,并将当前线程对象的引用赋值给 AQS 。
如果在尝试修改 state 变量值的时候失败了,则调用 acquire(xx) 去获取锁,在方法内部将自己添加到当前等待队列中,并且以 CAS 的操作不断自旋,不断尝试去获取当 父node节点 的前一个节点是否等于 head节点 ,并且当前线程是否已经尝试拿到锁,如果前一个节点等于 head节点 并且当前修改 state 变量成功,则代表当前线程已经拿到锁,则将 当前node 节点置为头结点,并移除其前一个节点。当然,AQS 对这个做了很多处理,它并不会一直重复上述重试操作,当经历一段自旋后,它就会以线程中断的方式停止下来,并且取消当前的尝试。
通过理一遍 ReentrantLock 的源码,我们大致了解了一下整个流程,及相应方法的具体职责,这对我们理解 AQS 将起到一些重要的作用。以及自定义一个 自己的重入锁 也将会有帮助。
3. 用AQS写一个重入锁
锁的可重入
指的是当某个线程调用某个方法或者对象获取了一把锁时,再次调用了指定方法,导致的锁的重入。即本身已经获取到了锁,又一次经历了锁的获取,一般情况下,我们会在再次进入时判断当前线程是否获取了锁,如果获取了,就修改同步状态,即 AQS 中的 state+1 。为什么要state+1 ,因为释放锁的时候需要-1啊。
具体代码如下:
4. AQS于我们的日常
说实话,不会使用 AQS ,并不会影响开发任何,在Android开发的现在,各种线程相关的工具库,Rx , 协程 ,都是在降低开发难度,但作为基础,我们还是应该明白有些底层的设计思想,当你或许有一天想要自己去定义一个特定规则的线程工具时,这些看上去好像对我们实际用处不大的东西就都会派上用场。
任何东西的学习,都免不了一个 为什么 ?
比如为什么加了 synchronized 就可以加锁,为什么 ReentrantLock 是可重入呢,当你想要搞清楚这些原因的时候,这些看起来晦涩的东西就是唯一入口。