二、AQS对条件变量的支持
1、条件变量
之前的推文中,讲解过notify和wait是配合synchronized内置锁来实现线程间同步的,而条件变量的signal和await方法则是通过AQS来实现线程间同步的。
两者的不同之处在于:
synchronized同时只能与一个共享变量的notify或wait方法实现同步
AQS的一个锁可以对应多个条件变量的signal或await方法实现同步
共享变量我们知道,那啥是条件变量呢?来看看下面的例子:
package AQSLearn; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by Zhong Mingyi on 2020/12/10. */ public class ConditionTest { private static ReentrantLock reentrantLock = new ReentrantLock(); private static Condition condition = reentrantLock.newCondition(); public static void main(String[] args) throws InterruptedException { new Thread(()->{ reentrantLock.lock(); try { System.out.println("thread1开始等待"); condition.await(); System.out.println("thread1被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); }finally { reentrantLock.unlock(); } },"thread1").start(); Thread.sleep(1000); new Thread(()->{ reentrantLock.lock(); System.out.println("thread2开始唤醒"); condition.signal(); System.out.println("thread2唤醒完成"); reentrantLock.unlock(); },"thread2").start(); } }
上述代码中:
- 首先,创建了ReentrantLock对象,它是基于AQS实现的锁。
- 然后,调用ReentrantLock对象的newCondition方法创建了一个Condition对象,这个Condition就是ReentrantLock锁对应的条件变量;一个ReentrantLock对象可以创建多个Condition对象。
- 接下来,线程thread1调用reentrantLock.lock()方法获取独占锁,然后调用condition.await()方法阻塞挂起自己。
- 线程thread2调用reentrantLock.lock()方法获取独占锁,然后调用condition.signal()方法唤醒了因为调用了该条件变量的await方法而阻塞的线程thread1。
- 线程thread1从await方法处返回,继续执行后续逻辑。
2、条件队列
想知道内部的实现原理,来看看源码是怎么做的?
reentrantLock.newCondition()的作用其实是创建了一个在AQS内部声明的对象ConditionObject,由于ConditionObject是AQS的内部类,所以它可以访问AQS内部的变量和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await方法时被阻塞的线程。
我们来看下condition.await();方法的实现。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//创建node节点,插入到条件队列队尾 int savedState = fullyRelease(node);//释放当前线程获取的锁 int interruptMode = 0; while (!isOnSyncQueue(node)) {//调用park方法阻塞当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
当线程调用条件变量的await 方法时(必须先调用锁的lock方法获取锁),在内部会构造一个类型为 Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁,并被阻塞挂起。这时候如果有其他线程调用lock方法尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await方法处阻塞。
再来看下condition.signal();方法的实现:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
当另外一个线程调用共享变量的signal方法时(必须先调用锁的lock方法获取锁),会把条件队列里队头的一个线程节点从条件队列里移除并放入AQS的阻塞队列里面,然后激活这个线程。
最后再来看看当一个线程调用条件变量的await方法被阻塞后,是如何进入条件队列的?
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
其实学习了AQS队列的插入后,理解其条件队列的插入就更加简单了。首先是创建一个类型为Node.CONDITION的节点,然后单向地向条件队列尾部插入一个元素。
注意到:当多个线程同时调用lock方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
3、小结一下
当一个线程调用了条件变量的await方法后(必须在获取锁之后),会被转成Node节点放入到条件队列中,并释放获取的锁。
当另一个线程调用了条件变量的signal或signalAll方法时(必须在获取锁之后),会把条件队列里的一个或全部Node节点移动到AQS的阻塞队列中,等待时间获取锁。