ReentrantLock
一、引言
并发编程在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。
作为一个在互联网公司面一次拿一次 Offer
的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。
于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写**《吊打面试官》**系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer
!
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!
二、使用
public class ReentrantLockTest { private static int count; public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Thread thread1 = new Thread(() -> { // 加锁 lock.lock(); for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " : " + count++); } // 解锁 lock.unlock(); }); thread1.setName("Thread-1"); thread1.start(); Thread thread2 = new Thread(() -> { // 加锁 lock.lock(); for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " : " + count--); } // 解锁 lock.unlock(); }); thread2.setName("Thread-2"); thread2.start(); } }
最终输出:
Thread-1 : 0 Thread-1 : 1 Thread-1 : 2 Thread-1 : 3 Thread-1 : 4 Thread-1 : 5 Thread-1 : 6 Thread-1 : 7 Thread-1 : 8 Thread-1 : 9 Thread-2 : 10 Thread-2 : 9 Thread-2 : 8 Thread-2 : 7 Thread-2 : 6 Thread-2 : 5 Thread-2 : 4 Thread-2 : 3 Thread-2 : 2 Thread-2 : 1
通过 ReentrantLock
锁让我们的线程安全的交替打印 count
的数值,那这个到底是如何实现的呢?
三、源码
从 ReentrantLock
源码来说,主要的方法如下:
- 加锁:lock、tryLock、lockInterruptibly
- 释放锁:unlock
1、ReentrantLock 结构
我们先来看一下整体如下所示:
我们简单的介绍一下每部分的结构:
// 头结点 private transient volatile Node head; // 尾结点 private transient volatile Node tail; // 状态标记(判断当前线程是否拿到锁的标记) private volatile int state; // 当前占领锁的线程 private transient Thread exclusiveOwnerThread;
我们看下 Node
节点的数据:
class Node { // 共享锁、互斥锁 static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 当前节点的等待状态,默认为0 volatile int waitStatus; // 表示线程获取锁的请求已经取消了 static final int CANCELLED = 1; // 表示线程已经准备好了,就等资源释放了 static final int SIGNAL = -1; // 表示节点在等待队列中,节点线程等待唤醒 static final int CONDITION = -2; // 当前线程处在SHARED情况下,该字段才会使用 static final int PROPAGATE = -3; // 指向前继节点的指针 volatile Node prev; // 指向后继节点的指针 volatile Node next; // 当前Node节点持有的线程 volatile Thread thread; // 暂时可忽略 Node nextWaiter; }
我们简单的过一下上面的结构,具体的使用我们后面源码里面会进行讨论。
2、Lock 源码
对于 ReentrantLock
里面的加锁源码,一共有三种类型:
- lock
- tryLock
- lockInterruptibly
我们挨个进行解读
2.1 lock()
public void lock() { // 这里的lock实现一共有两种 // 公平锁与非公平锁 sync.lock(); }
我们先来看非公平锁的代码:
final void lock() { // 尝试CAS将当前的State从0置为1 // 这里直接CAS也符合我们非公平锁的性格,不管你三七二十一,我直接CAS尝试一下能不能拿到锁 if (compareAndSetState(0, 1)) // 如果真让这哥们给拿到了,那没办法,就只能将AQS里面的占领锁的线程给设置成当前请求的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 如果这哥们没拿到,那就老老实实的走我们的逻辑 acquire(1); } // 将当前线程赋值于exclusiveOwnerThread protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
我们再看下公平锁的代码:
final void lock() { // 老老实实的走我们的逻辑 acquire(1); }
会不会突然发现,公平锁和非公平锁就差了一个上来CAS的步骤
我们继续往下看
2.1.1 acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } }
我们首先看一下 tryAcquire
方法,这个方法也分公平锁和非公平锁,但实现差距不大
2.1.2 tryAcquire
公平锁
protected final boolean tryAcquire(int acquires) { // 拿到当前的线程 final Thread current = Thread.currentThread(); // 获取当前AQS的标记位 int c = getState(); // 如果发现是0,说明当前没有人拿到锁,我可以直接去拿 if (c == 0) { // hasQueuedPredecessors:当前线程是否符合更改State的资格 // compareAndSetState:CAS将0修改为1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // AQS里面的占领锁的线程给设置成当前请求的线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果current == getExclusiveOwnerThread()的话,说明当前线程已经持有锁的,触发可重入功能 // State加一 int nextc = c + acquires; // 判断一下这哥们是不是越界了(Integer.MAX_VALUE) if (nextc < 0){ throw new Error("Maximum lock count exceeded"); } // 将State设置进去 setState(nextc); return true; } // 如果即拿不到State=0,又没办法可重入,只能抱歉,再见了您 return false; } // 若想让我们执行上面的逻辑,这里的返回值必须是false // 我们分析一下false的情况: // 当前AQS中无数据,h == t,直接返回false // 当前AQS中有数据,当前线程等于等待队列中的第一个线程,直接返回false(这里防止你后续排队的线程插队的情况) public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s = h.next; return h != t && (s == null || s.thread != Thread.currentThread()); }
非公平锁
观察一下这个非公平锁的方法,有莫有什么发现
对了,就和我们的 公平锁 差一个 hasQueuedPredecessors
的方法
人家公平锁来了之后,会先看看自己是不是排队的第一个,如果不是的话,我就不抢占state了
但非公平锁不这样想,这哥们来了,我管你是不是排队第一个,我都去CAS一把,看看能不能拿到
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 这里的代码合前面一样,我们就不再说了 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
2.1.3 addWaiter
当我们线程能够拿到 state
时,直接 true
,结束即可
但当我们拿不到 state
时,就需要继续往下走
addWaiter
就是将当前封装成 Node
插入到 AQS
中,当然插入的时候有两种情况:
- AQS数据为空:这个时候我们需要初始化完再将当前
Node
插入 - AQS数据不为空:直接将当前
Node
插入即可
// Node EXCLUSIVE:互斥锁 private Node addWaiter(Node mode) { // 哥们你既然拿不到的话,那你就封装成Node排队去吧 Node node = new Node(Thread.currentThread(), mode); // 定义一个指针,指向末尾 Node pred = tail; // 如果指针不为空的话,证明里面是有Node的 if (pred != null) { // 这里主要做的操作是将当前的node插入到队尾 // 一共分三步 // 1:将当前node的前继指针指向pred // 2:将当前tail的指向node // 3:将当前pred的后继指针指向node // 最后返回node即可 // 这里如果搞不明白的,可以自己画图演示一遍 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果当前的pred等于null,也就证明AQS里面没有数据,需要添加 enq(node); return node; } // 循环插入,一定要将当前node插入到AQS中 private Node enq(final Node node) { for (;;) { // 这里再看一下tail的值 Node t = tail; if (t == null) { // 如果还是为null的话,说明我们需要初始化了 // CAS将head指向新创建的【new Node()】 // tail也指向一下,这样我们初始化就搞好了 // 记住:这里初始化搞完,还是需要走下面的逻辑,目的是为了将当前的线程封装进去,我们之前封装的是一个虚拟头结点(这也是AQS的灵魂所在) if (compareAndSetHead(new Node())) tail = head; } else { // 到这一步就是这哥们在前面CAS抢占的时候,失败了或者初始化完需要封装当前线程 // 再这里再重复一遍上面的逻辑 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
2.1.4 acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 拿到当前node的前继节点 final Node p = node.predecessor(); // 前继节点是head:代表当前节点处于AQS第一个位置 // tryAcquire返回true:让这哥们再次去尝试一下,能不能拿到锁 // 因为这里前面没有节点了,自然而然就轮到该节点了 if (p == head && tryAcquire(arg)) { // 这哥们抢成功之后,AQS中需要把他删除 // 还记得我们前面有一个虚拟节点嘛,我们要让node替代我们的虚拟节点 // 将head指向node,将node的pre设置为null // 将p的next设置为null,让其没有链接,可达性分析直接GC掉 // 这段也需要自己画图演示一下 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果这哥们前继节点不是head的话 // 说明我们需要对当前的线程做一个挂起的操作,但是挂起之前我们需要校验一下前继Node的状态 // 如果当前的前继节点为Node.SIGNAL时,继续执行parkAndCheckInterrupt // if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } // pred:前继节点 // node:当前节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 拿到前继节点的等待状态 int ws = pred.waitStatus; // 如果前继节点Node.SIGNAL,说明前继Node是好的,直接返回true就可以了 // 这里观察前继节点的原因是因为:我们当前节点的唤醒操作是需要指望前继节点的 if (ws == Node.SIGNAL) return true; // 如果前继节点状态为1的话,证明前继节点获取锁的请求已经取消了 // 也就是该节点已经无效了 if (ws > 0) { do { // 我们当前节点需要自我救赎一下,不能你这个前继节点完蛋了,当前节点也跟着完蛋吧 // 怎么救赎呢? // 我们当前节点一直向前探索,直到找到能用的前继节点为止 // 然后将pred挂在这个有效的前继节点上,前继节点的next挂到当前线程 // 这部分操作完成之后,中间无效的节点可直接被GC回收掉 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果到这里,说明我们的状态如下:节点线程等待唤醒 // 那么我们CAS将该节点的状态修改为Node.SIGNAL // 这样的话,我们当前节点可以保证前继节点是有效的 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 非有效的情况下,均返回false return false; } // 直接调用底层park指令将当前线程挂起 private final boolean parkAndCheckInterrupt() { // 这一块会将系统挂起,等到前继节点执行unpark指令将该线程唤醒 LockSupport.park(this); return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
2.1.5 selfInterrupt
当我们的 acquireQueued
方法执行 selfInterrupt
返回 true
时,这个时候才会执行我们的 selfInterrupt
方法
这个地方特别有意思,博主看了好多文章才参透其中的奥秘
首先,我们看什么时候会返回 true
private final boolean parkAndCheckInterrupt() { // 这一块会将系统挂起,等到前继节点执行unpark指令将该线程唤醒 LockSupport.park(this); // 这里返回true的时候才会执行selfInterrupt方法 return Thread.interrupted(); }
所以,我们先来解释一下 Thread.interrupted()
是什么意思以及什么情况下会返回 true
对于线程来说,执行中断的标记一共有三个方法:
- interrupt:是给线程设置中断标志
- interrupted:是检测中断并清除中断状态
- isInterrupted:只检测中断
所以,我们上面的 Thread.interrupted()
返回 true
的时机是当前线程处于中断标记,执行之后会清除其中断状态
我们一起来看下 selfInterrupt
的执行
static void selfInterrupt() { Thread.currentThread().interrupt(); }
咦,你突然的会发现,我靠,这个地方干嘛又给他设置中断状态了,这么奇怪。
这部分属于Java提供的协作式中断知识内容,这里简单介绍一下:
- 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过
Thread.interrupted()
方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False
),并记录下来,如果发现该线程被中断过,就再中断一次。 - 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。
关于 Java协作式中断
,可以参考下:https://blog.csdn.net/reliveIT/article/details/49920205
到这里,我们整个的 lock
流程就结束了
3、unLock 源码
我们释放锁的源码里面没有公平锁和非公平锁区分
// 解锁 lock.unlock(); public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3.1 tryRelease
- 当前该线程是否为占用state的线程,不是则抛出异常
- 如果是的话则将state减一,看其是否为0(是否仍然有线程持有该锁)
// releases = 1 protected final boolean tryRelease(int releases) { // 获取当前的State减一 int c = getState() - releases; // 比较当前的线程与获取锁的线程是不是同一个 if (Thread.currentThread() != getExclusiveOwnerThread()){ // 不是同一个,则抛出异常 throw new IllegalMonitorStateException(); } boolean free = false; // 如果当前state为0的话,则代表没有线程持有锁 if (c == 0) { // 将空闲标记置为true free = true; // 将持有锁的线程设置为null setExclusiveOwnerThread(null); } // 设置state setState(c); // 如果当前没有线程持有state,这里会返回true // 如果仍然存在线程持有state,这里会返回false return free; }
3.2 unparkSuccessor
当我们上面没有线程持有 state
时,会走向下面的代码:
public final boolean release(int arg) { if (tryRelease(arg)) { // 设置h引用指向head Node h = head; // 当前AQS中有数据并且头指针的节点是有效的 if (h != null && h.waitStatus != 0) // 唤醒节点 unparkSuccessor(h); return true; } return false; }
那这个唤醒节点到底是如何操作的呢?我们一起来看一下:
// node = head private void unparkSuccessor(Node node) { // 拿到当前head节点的等待状态 int ws = node.waitStatus; // 如果小于0,先CAS将其Status改为0 // 这里修改的原因:如果我们不修改的话,在并发情况下,这里会频繁的去唤醒后续的节点 // 有点类似我们之前kafka的mute,唤醒一个我就关闭一下,唤醒一个关闭一下 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 拿到头结点的下一个节点(正式节点) Node s = node.next; // 如果下一个节点是空的或者节点无效的,需要执行下述操作 if (s == null || s.waitStatus > 0) { // 无效的话,直接将节点置空就可以了 s = null; // 从后往前找到状态小于等于0的节点 // 找到离head最新的有效节点,并赋值给s for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 当前节点不等于空的情况下,直接unpark唤醒即可 if (s != null) LockSupport.unpark(s.thread); }
这里简单的来介绍下从后往前找:
上面我们可以知道,当 下一个节点是空的或者节点无效的
的时候,会触发向前寻找的操作
比如上面这种情况,如果我们继续从头结点开始寻找的话,实际上我们是找不到后面的节点的(在黄色null已经空指针了)
所以,我们需要从 tail
指针,从后往前找到符合我们要求(有效)的节点,也就是 蓝色
的节点。
然后将其唤醒即可。
四、流程图
高清图可私聊博主获取
五、写在最后
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。