1、Condition定义
Condition是一个接口,定义在juc中(java.util.concurrent.locks.Condition),它的主要功能类似于wait()/notify(),但是Condition其实现比wait()/notify()使用更加灵活,简洁、适用场景更加丰富。
2、Condition之于Lock与wait()/notify()之于synchronized
2.1 wait()/notify()之于synchronized
java.lang.Object中定义了一组监视器方法,例如wait()、wait(long timeout)、wait(long timeout, int nanos)、notify()、notifyAll()而object是任何对象的超类,所以任意java对象都拥有这组监视器方法。这些方法再配合上synchronized同步关键字,我们就可以实现等待/通知机制。
2.2 Condition之于Lock
Condition接口中提供了await()、await(long time, TimeUnit unit)、signal()、signalAll()等方法的定义,这些方法的实现配合上Lock也可实现等待/通知机制。
2.3 Object监视器方法与Condition接口两者简要对比
注意:在Condition对象中,与wait()、notify()、和notifyAll()对应的方法分别是await()、signal()、signalAll()方法。Condition对Object进行了扩展(隐藏关系),所有Condition包含wait()和notify()方法。
对于上述对比中最重要的区别在于:
Condition提供更加丰富的wait()机制,例如基于指定时间的限时等待
对于每个Lock,可以存在任意数量的Condition对象。
3、Condition接口定义与使用案例
3.1 Condition的接口定义
Condition接口中定义的方法如下,具体方法定义的含义请看方法上的注释
3.3 使用案例
通过Lock和Condition来实现一个有界缓存队列,生产线程的向队列中添加数据,当队列满了的时候put()操作会被阻塞;反之,消费线程不断的从队列中取出数据,当队列为空时,take()操作会被阻塞。
package com.lizba.p6; import org.omg.CORBA.Object; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * <p> * 使用Condition来实现的一个有界队列示例代码 * </p> * * @Author: Liziba * @Date: 2021/6/26 16:03 */ public class ConditionBoundedBuffer<E> { /** 定义数组做有界队列容器 */ final E[] items; /** 可重入锁ReentrantLock() */ final Lock lock = new ReentrantLock(); /** 条件:数组不满 notFull ( count < item.length) */ final Condition notFull = lock.newCondition(); /** 条件:数组不为空 notEmpty ( count > 0) */ final Condition notEmpty = lock.newCondition(); /** * tail游标,记录当前插入元素到了哪个位置 * head游标,记录当前获取元素到了哪个位置 * count,记录容器中的元素的个数 * */ private int tail, head, count; public ConditionBoundedBuffer(int size) { this.items = (E[]) new Object[size]; } /** * 添加元素操作 * @param e * @throws InterruptedException */ public void put(E e) throws InterruptedException { lock.lock(); try { // 当数组满时,调用 notFull.await();使得插入元素的线程阻塞 while (count == items.length) { notFull.await(); } items[tail] = e; if (++tail == items.length) { tail = 0; } ++count; // 唤醒获取元素的线程 notEmpty.signalAll(); } finally { lock.unlock(); } } /** * 获取元素操作 * @return * @throws InterruptedException */ public E take() throws InterruptedException { lock.lock(); try { // 当数组为空时,调用notEmpty.await();使得获取元素的线程阻塞 while (count == 0) { notEmpty.await(); } E ret = items[head]; items[head] = null; if (++head == items.length) { head = 0; } --count; // 唤醒插入元素的线程 notFull.signalAll(); return ret; } finally { lock.unlock(); } } }
4、Condtion 实现源码分析
4.1 互斥锁和读写锁中Condition的构造
4.1.1 AbstractQueuedSynchronizer中的ConditionObject
ReentrantLock与ReentrantReadWriteLock中的静态内部类Sync继承了AbstractQueuedSynchronizer,两者调用的sync.newCondition(),实际上调用的是new ConditionObject(),也就是构造的AbstractQueuedSynchronizer中的ConditionObject对象。
4.2 await()源码分析
4.2.1 await()位于AbstractQueuedSynchronizer中的ConditionObject
调用Condition的await()或者awaitXxxx()会导致线程构建成Node节点加入Condition的等待队列,并且释放锁。如果线程从await()或者awaitXxxI()方法返回,表明线程又重新获取了Condition相关的锁。
4.2.2 总结
await()方法在调用之前,线程一定获取到了锁,因此addConditionWaiter()无需CAS也可以保证线程安全
在阻塞自己之前,必须先释放锁fullyRelease(node),防止死锁
线程从wait中被唤醒后,必须通过acquireQueued(node, savedState)重新获取锁
isOnSyncQueue(node)用于判断节点是否在AQS同步队列中(关于同步队列和等待队列文章后面有图解),如果从Condition的等待队列移动到了AQS的同步队列证明被执行了signal()
LockSupport.park(this)阻塞自己之后,线程被唤醒的方式有unpark和中断,通过checkInterruptWhileWaiting(node)判断当前线程被唤醒是否是因为中断,如果中断则退出循环
4.3 signal()源码解析
调用Condition的signal()方法,会唤醒在Condition等待队列中的线程节点(唤醒的是等待时间最长的首节点),唤醒节点之前会将其移至同步队列中(这里要注意先加入同步队列在唤醒该节点,等会画图别混淆)。
5、Condition实现原理图解
5.1 图解同步队列与等待队列
在文章开头第二大点介绍Condition之于Lock与wati()/notify()之于synchronized时,我们对比过二者,其中很大的一个区别在于Object的监视模型上,一个对象只拥有一个同步队列和等待队列,这样的模型一个很大的问题在于它不太适用于编写带有多个条件谓词的并发对象(可以简单理解为复杂的带高级功能的);而并发包中的Lock中的组合了Condition对象,使得其可以拥有一个同步队列和多个等待队列(一个Condition中有一个等待队列)。下面就通过图来说明Condition的等待队列和同步器中的同步队列和等待队列之间的关系。
Condition等待队列图示
5.2 图解await()方法如何加入等待队列
前面讲过,调用await()方法的前提是获取到了Lock对应的锁,也正是因为这个await()操作是在获取锁的前提下进行的,所以节点的构造并未使用CAS,因为它的前提条件就是线程互斥(安全)的;同时我们在讲述AQS、ReentrantLock和ReentrantReadWriteLock时讲述过其线程竞争锁资源失败,线程将会被构造成同步节点,加入AQS的同步队列中,等待后续的再次竞争或者中断退出等。上图也讲过了同步器AQS中的同步队列和Condition中的等待队列之间的关系,所以加入Condition等待队列的线程,可以理解为在AQS同步器中重新获取到锁的首节点线程被移植(这里的移植不是将以前的节点加入,是通过以前节点的信息构造一个新的线程节点加入到等待队列)到了Condition的等待队列中,其图如下。
5.3 图解signal()方法如何移出等待队列
signal()方法会将等待队列中的等待时间最长的节点(首节点),移动到同步队列尾部,加入同步队列的代码是 enq(final Node node),通过CAS来线程安全的移动。移动完成之后线程再使用LockSupport.unpark(node.thread);唤醒该节点中等待的线程。线程节点被移动至同步队列中后,线程可以参与同步状态的竞争,如果竞争成功,线程将会从await()方法返回。其图解如下。6、总结
Condition中的await()/signal()相比Object中的wait()/notify(),Condition拥有更多的高级特性能够实现更加复杂的等待线程集的场景。但是我们在使用Lock和Condition时在调用await()和signal()要注意必须持有Lock对象(尽管在Lock对象中定义的具体实现,构造一个Condition可以不满足持有Lock对象这个条件)。