独占锁 ReentrantLock 原理
介绍
ReentrantLock 是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞然后放入到该锁的AQS阻塞队列
中。
它具有与synchronized
相同的基本行为和语义,但 ReentrantLock 更灵活、更强大,增加了轮询、超时、中断等高级功能,并且还支持公平锁和非公平锁。
从类图可以看到,ReentrantLock 还是使用 AQS 来实现的,并且可以根据参数来选择其内部是一个公平锁还是非公平锁,默认为非公平锁。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
Sync 类直接继承 AQS 类,它的子类 NonfairSync 和 FairSync 分别实现了获取锁的非公平和公平策略。
static final class NonfairSync extends Sync {} static final class FairSync extends Sync {}
在 ReentrantLock 中 AQS 的 state 状态值表示线程获取锁的可重入次数,在默认情况下,state 值为 0 表示当前所没有任何线程持有。当一个线程第一次获取该锁后,会尝试使用 CAS 将 state 设置为 1,如果 CAS 成功则当前线程获取该锁,然后记录该锁的持有者为当前线程。在该线程第二次获取该锁后,则会将 state 设置为 2,这就是可重入次数。在该线程释放锁时,会尝试使用 CAS 将 state 减 1,如果减 1 后为 0 则释放锁。
获取锁
void lock()
public void lock() { sync.lock(); }
当线程调用该方法时,如果当前锁没有被其他线程占用并且当前线程之前没获取过该锁,则当前线程获取,然后设置锁的拥有者为自己,随后设置 AQS 的 state 为 1,然后返回。
如果当前线程已经获取过该锁,则只是简单的将 AQS 的 state 加 1,然后返回。
如果该锁已经被其他线程所持有,则当前线程会进入 AQS 的阻塞队列中阻塞挂起。
在 ReentrantLock 中的 lock()方法委托给了 sync 类,sync 则根据 ReentrantLock 的构造函数选择使用 NonfairSync 或 FairSync。
我们先看看非公平锁的实现。
final void lock() { // CAS设置状态值 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 调用AQS的acquire方法 acquire(1); }
在 lock()中首先调用了 compareAndSetState 方法,因为默认 state 状态值为 0,所以第一个线程在首次调用该方法时通过 CAS 会设置为 1,随后成功获取到该锁,然后通过 setExclusiveOwnerThread 方法将锁持有者设置为当前线程。
当有其它线程通过 lock()来获取该锁时候,会 CAS 失败进入 else 调用 acquire(1)方法并且传参数为 1,下面我们再看一下 acquire 方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
之前文章讲过,AQS 没有提供 tryAcquire 方法的实现,我们看一下 ReentrantLock 重写的 tryAcquire 方法,这里我们还是看非公平锁的实现。
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //调用该方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
该方法首先查看当前状态值是否为 0,为 0 则说明锁目前是空闲状态,然后尝试 CAS 获取锁,成功后 state 设置为 1,然后锁持有者为当前线程。
如果 state 不为 0,则说明锁已经被持有,如果持有者正好是当前线程则进行 state 加 1,然后返回 true,需要注意,如果 nextc < 0 则说明锁可能溢出。
如果当前线程不是持有者则返回 false 随后加入 AQS 阻塞队列。
下面我们看一下公平锁的实现。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //公平策略 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁和非公平锁的 tryAcquire 方法不同之处就是多了一个 hasQueuedPredecessors()方法,该方法就是实现公平锁的核心代码。
public final boolean hasQueuedPredecessors() { //读取头节点 Node t = tail; //读取尾节点 Node h = head; //s是首节点h的后继节点 Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
在该方法中,因为队列是 FIFO 的,所以需要判断队列中有没有相关线程的节点已经在排队了。有则返回 true 表示线程需要排队,没有则返回 false 则表示线程无需排队。
首先我们看第一个条件h != t
,
- 头节点和尾节点都为 null,表示队列都还是空的,甚至都没完成初始化,那么自然返回 fasle,无需排队。
- 头节点和尾节点不为 null 但是相等,说明头节点和尾节点都指向一个元素,表示队列中只有一个节点,这时候也无需排队,因为队列中的第一个节点是不参与排队的,它持有着同步状态,那么第二个进来的节点就无需排队,因为它的前继节点就是头节点,所以第二个进来的节点就是第一个能正常获取同步状态的节点,第三个节点才需要排队,等待第二个节点释放同步状态。
接下来我们看第二个条件,(s = h.next) == null
,如果h != t && s == null
则说明有一个元素将要作为 AQS 的第一个节点入队,则返回 true。
接下来看第三个条件,s.thread != Thread.currentThread()
,判断后继节点是否为当前线程。
🙋🏻♀️ 举例,情况一:
h != t 返回true
,(s = h.next) == null 返回false
, s.thread != Thread.currentThread()返回false
首先 h != t 返回true
,说明队列中至少有两个不同节点存在;
(s = h.next) == null 返回false
,说明头节点之后是有后继节点存在;
s.thread != Thread.currentThread()返回 false,说明当前线程和后继节点相同;
说明已经轮到当前节点去尝试获取同步状态,无需排队,返回 false。
🙋🏻♀️ 举例,情况二:
h != t 返回true
,(s = h.next) == null 返回true
首先 h != t 返回true
,说明队列中至少有两个不同节点存在;
(s = h.next) == null 返回true
,说明头节点也就是哨兵节点之后没有后继节点;
返回 true,说明需要排队。
🙋🏻♀️ 举例,情况三:
h != t 返回true
,(s = h.next) == null 返回false
,s.thread != Thread.currentThread()返回true
首先 h != t 返回true
,说明队列中至少有两个不同节点存在;
(s = h.next) == null 返回false
,说明头节点之后是有后继节点存在;
s.thread != Thread.currentThread()返回true
,说明后继节点的线程不是当前线程,说明前面已经有人在排队了,还是得老老实实排队。
返回 true,说明需要排队。
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) throws InterruptedException { // 如果当前线程被中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试获取资源 if (!tryAcquire(arg)) //调用AQS可被中断的方法 doAcquireInterruptibly(arg); }
void lockInterruptibly()
该方法和 lock()方法类似,只不过它会对中断进行响应,就是当前线程在调用该方法时,如果他其他线程调用了当前线程的 interrupt()方法,则当前线程会抛出 InterruptedException 异常。
boolean tryLock()
public boolean tryLock() { return sync.nonfairTryAcquire(1); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
该方法尝试获取锁,如果当前锁没有被其他线程持有,则当前线程获取该锁并返回 true,否则返回 false。
📢 注意:该方法不会引起当前线程阻塞。
boolean tryLock(long timeout,TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
与 tryLock 不同之处在于,设置了超时时间,如果超时时间到还没有获取到该锁,则返回 false。
释放锁
void unlock()
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果不是锁持有者调用unlock则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果当前可重入次数为0 则情况锁持有线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
该方法首先尝试释放锁,如果 tryRelease()方法返回 true 则释放该锁,否则只是减 1,如果不是锁持有者去调用 unlock 则抛出 IllegalMonitorStateException 异常。
代码实践
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2022/1/21 * @Description 使用ReentrantLock实现简单的线程安全List */ public class ReentrantLockList { private List<String> array = new ArrayList<>(); private volatile ReentrantLock lock = new ReentrantLock(); public void add(String e) { lock.lock(); try { array.add(e); } finally { lock.unlock(); } } public void remove(String e) { lock.lock(); try { array.remove(e); } finally { lock.unlock(); } } public String get(int index) { lock.lock(); try { return array.get(index); } finally { lock.unlock(); } } }
该类在通过操作 array 之前,通过加锁来保证同一时间只有一个线程可以操作 array,但是也只能有一个线程可以对 array 元素进行访问。
总结
当同时有三个线程尝试获取独占锁 ReentrantLock 时,如果线程 1 获取到,则线程 2、3 都会被转换为 Node 节点随后被放入 ReentrantLock 对应的 AQS 阻塞队列中然后被挂起。
假设线程 1 在获取到锁之后,调用了锁创建的条件变量 1 进入 await 后,线程 1 就会释放该锁
。然后线程 1 会被转换为 Node 节点插入条件变量 1 的条件队列中。
由于线程 1 释放了锁,所以阻塞队列中的线程 2,线程 3 都会有机会获取到该锁,假如使用的是公平锁,那么线程 2 则会获取到该锁,然后从 AQS 阻塞队列中移除线程 2 对应的 Node 节点。