2.3 丢失的信号
保证notify一定在wait之后
2.4 通知
调用notify和notifyAll也得持有与条件队列对象相关联的锁。调用notify,JVM Thread Scheduler在这个条件队列上等待的多个线程中选择一个唤醒,而notifyAll则会唤醒所有线程。因此一旦notify了那么就需要尽快的释放锁,否则别人都竞争等着拿锁,都会进行blocked的状态,而不是线程挂起waiting状态,竞争都了不是好事,但是这是你考了性能因素和安全性因素的一个矛盾,具体问题要具体分析。
下面的方法可以进来减少竞争,但实现有些难写,所以还得折中考虑:
/
public synchronized void alternatePut(V v) throws InterruptedException { while (isFull()) wait(); boolean wasEmpty = isEmpty(); doPut(v); if (wasEmpty) notifyAll(); }
使用notify容易丢失信号,所以大多数情况下用notifyAll,比如take notify,却通知了另外一个take,没有通知put,那么这就是信号丢失,是一种“被劫持的”信号。
因此只有满足下面两个条件,才能用notify,而不是notifyAll:
- 所有等待线程的类型都相同
- 单进单出
2.5 示例:阀门类A Gate Class
和第5章的那个TestHarness中使用CountDownLatch类似,完全可以使用wait/notifyAll做阀门。
@ThreadSafe public class ThreadGate { // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n) @GuardedBy("this") private boolean isOpen; @GuardedBy("this") private int generation; public synchronized void close() { isOpen = false; } public synchronized void open() { ++generation; isOpen = true; notifyAll(); } // BLOCKS-UNTIL: opened-since(generation on entry) public synchronized void await() throws InterruptedException { int arrivalGeneration = generation; while (!isOpen && arrivalGeneration == generation) wait(); } }
3 Explicit Condition Objects
Lock是一个内置锁的替代,而Condition也是一种广义的内置条件队列。
Condition的API:
public interface Condition { void await() throws InterruptedException; boolean await(long time, TimeUnit unit)throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列。所以在BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,所以notifyAll经常通知不是同一个类型的需求。如果想编写一个带有多个条件谓词的并发对象,或想获得除了条件队列可见性之外的更多的控制权,可以使用Lock和Condition,而非内置锁和条件队列,这更加灵活。
一个Condition和一个lock关联,想象一个条件队列和内置锁关联一样。在Lock上调用newCondition就可以新建无数个条件谓词,这些condition是可中断的、可有时间限制的,公平的或者非公平的队列操作。
The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll。
下面的例子就是改造后的BoundedBuffer:
/
@ThreadSafe public class ConditionBoundedBuffer <T> { protected final Lock lock = new ReentrantLock(); // CONDITION PREDICATE: notFull (count < items.length) private final Condition notFull = lock.newCondition(); // CONDITION PREDICATE: notEmpty (count > 0) private final Condition notEmpty = lock.newCondition(); private static final int BUFFER_SIZE = 100; @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE]; @GuardedBy("lock") private int tail, head, count; // BLOCKS-UNTIL: notFull public void put(T x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[tail] = x; if (++tail == items.length) tail = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // BLOCKS-UNTIL: notEmpty public T take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); T x = items[head]; items[head] = null; if (++head == items.length) head = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
注意这里使用了signal而不是signalll,能极大减少每次缓存操作中发生的上下文切换和锁请求次数。
使用condition和内置锁和条件队列一样,必须保卫在lock里面。
4 Synchronizer剖析
看似ReentrantLock和Semaphore功能很类似,每次只允许一定的数量线程通过,到达阀门时:
- 可以通过 lock或者acquire
- 等待,阻塞住了
- 取消tryLock,tryAcquire
- 可中断的,限时的
- 公平等待和非公平等待
下面的程序是使用Lock做一个Mutex也就是持有一个许可的Semaphore。
@ThreadSafe public class SemaphoreOnLock { private final Lock lock = new ReentrantLock(); // CONDITION PREDICATE: permitsAvailable (permits > 0) private final Condition permitsAvailable = lock.newCondition(); @GuardedBy("lock") private int permits; SemaphoreOnLock(int initialPermits) { lock.lock(); try { permits = initialPermits; } finally { lock.unlock(); } } // BLOCKS-UNTIL: permitsAvailable public void acquire() throws InterruptedException { lock.lock(); try { while (permits <= 0) permitsAvailable.await(); --permits; } finally { lock.unlock(); } } public void release() { lock.lock(); try { ++permits; permitsAvailable.signal(); } finally { lock.unlock(); } } }
实际上很多J.U.C下面的类都是基于AbstractQueuedSynchronizer (AQS)构建的,例如CountDownLatch, ReentrantReadWriteLock, SynchronousQueue和FutureTask(java7之后不是了)。AQS解决了实现同步器时设计的大量细节问题,例如等待线程采用FIFO队列操作顺序。AQS不仅能极大极少实现同步器的工作量,并且也不必处理竞争问题,基于AQS构建只可能在一个时刻发生阻塞,从而降低上下文切换的开销,提高吞吐量。在设计AQS时,充分考虑了可伸缩性。