Condition 实现原理
说 Condition 前,需要说下 ConditioObject。ConditionObject 是同步器 AbstractQueuedSynchronzied 的内部类,因为 Condition 的操作需要关联的锁。ArrayBlockingQueue 就是 Condition 的具体应用。Object 中其实 也有 wait ,notify ,notifyAll 等操作, Condition 相当于将 wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为。
应用场景 ArrayBlockingQueue
ArrayBlockingQueue 的构造函数。
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
通过构造函数,可以看到 Condition 的创建时需要关联锁的。
从队列中去取出(take)数据 。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
往队列中加入数据 enqueue
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
可以看主要用了 await signal 等方法。具体代表什么含义?
Condition 实现主要包含三个部分:等待队列、等待、通知。
如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。
等待队列
等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。等待队列结构如下:
Condition 等待队列,也是包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入等待队列并进入等待状态。
Object 监视器模型
Object 监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:
等待
当调用 Condition 的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 当前线程加入等待队列 Node node = addConditionWaiter(); // 释放同步状态,也就是释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // node 不在节点中会一直 park 阻塞下去。达到等待的效果。 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。但是如果不是通过 Condition.signal 进行唤醒的,而是对等待线程进行中断,那么会抛出 InterruptedException。
调用 Condition signal 方法后,当前线程会加入到等待队列,如下图所示:
通知
调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively 检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。
节点从等待队列,移动到同步队列的操作过程如下:
通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。
被唤醒的线程,将从 await() 方法中的 while 循环中退出。从 await 方法看
// 当前节点已经在同步队列了,不会在循环下去了 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的 await 方法返回。此时线程已经成功获得了锁。
总结
本文剖析了一下 Condition 的实现原理,等待队列,等待,通知的实现原理。