1 基本设计
一种同步辅助,允许一或多个线程等待,直到在其它线程中执行的一组操作完成。
用给定的 count 初始化。由于调用countDown(),await 方法会阻塞,直到当前计数为0,之后释放所有等待线程,并立即返回任何后续的 await 调用。计数无法重置,若需重置计数,可使用CyclicBarrier。
CountDownLatch 是一种通用的同步工具,可用于:
count为1时初始化的CountDownLatch用作简单的门开关:所有调用wait的线程都在门口等待,直到调用countDown()的线程打开它
初始化为N的CountDownLatch可用来让一个线程等待,直到N个线程完成某动作或某动作已完成N次
CountDownLatch的一个有用的特性是,它不需要调用倒计时的线程等待计数达到0才继续,它只是防止任何线程继续等待,直到所有线程都通过。
2 类架构
CountDownLatch并未显式继承什么接口或类。
构造器
构造一个用给定计数初始化的CountDownLatch。
参数 count :在线程通过await()前,必须调用countDown()的次数。
CountDownLatch 的 state 并非 AQS 的默认值 0,而是可赋值的,即在 CountDownLatch 初始化时,count 就代表 state 的初始值。
new Sync(count) 就是调用了内部类 Sync 的如下构造器
count 表示我们希望等待的线程数,可能是等待一组线程全部启动或执行完成。
内部类
和 ReentrantLock 一样,CountDownLatch类也存在一个内部同步器 Sync,继承了 AbstractQueuedSynchronizer
唯一字段:
private static final class Sync extends AbstractQueuedSynchronizer { ... // 返回当前计数 int getCount() { return getState(); } // 共享模式下获取锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 共享模式下的释放锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取锁状态 int c = getState(); // 锁未被任何线程持有 if (c == 0) return false; // 对 state 进行递减,直到 state 变成 0;当 state 递减为 0 时,才返回 true int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
3 await
可以叫做等待,也可以称为加锁。
3.1 无参
使得当前线程等待,直到锁存器 count 为0,除非线程被中断。
若当前计数为零,则此方法立即返回。
若当前线程数>0,则当前线程将出于线程调度的目的而被禁用,并处于睡眠状态,直到发生如下情况之一:
由于调用countDown()方法,count为0
其他线程中断了当前线程
若当前线程:
在进入此方法时,已设置其中断状态
或在等待时被中断
就会抛 InterruptedException,并清除当前线程的中断状态。
无参版 await 内部使用的是 acquireSharedInterruptibly 方法,实现在 AQS 中的 final 方法:
使用CountDownLatch 的内部类 Sync 重写的tryAcquireShared 尝试获得锁,若获取到锁直接返回,获取不到走2
获取锁失败,用 Node 封装一下当前线程,追加到同步队列尾部,等待在合适时机去获得锁,本步已完全实现在 AQS 中
3.2 带超时参数
最终都会转化成ms
相比于无参版本,或者指定的等待时间已过。
如果当前计数为零,则此方法立即返回值 true。
如果当前线程数大于0,则当前线程将出于线程调度的目的而禁用,并处于休眠状态,直到发生以下三种情况之一:
由于调用了countDown()方法,计数为零;或
其他一些线程中断当前线程;或
指定的等待时间已经过了
如果计数为零,则该方法返回值true。
若当前线程:
在进入此方法时已设置其中断状态;或
在等待时中断,
就会抛出InterruptedException,并清除当前线程的中断状态。
如果指定的等待时间过期,则返回false值。如果时间小于或等于0,则该方法根本不会等待。
使用的是 AQS# tryAcquireSharedNanos 方法
获得锁时,state 的值不会发生变化,像 ReentrantLock 在获得锁时,会把 state + 1,但 CountDownLatch 不会
4 countDown
降低锁存器的计数,如果计数为 0,则释放所有等待的线程。
如果当前计数大于零,则递减。如果新计数为零,那么所有等待的线程都将重新启用,以便进行线程调度。
如果当前计数等于0,则什么也不会发生。
releaseShared 已经完全实现在 AQS
主要分成两步:
- 尝试释放锁(tryReleaseShared),锁释放失败直接返回,释放成功走 2,本步由 Sync 实现
- 释放当前节点的后置等待节点,该步 AQS 已经完全实现
5 最佳实践
Kafka中的线程控制代码大量使用CountDownLatch实现优雅的线程启动、线程关闭等操作。
6 总结
研究完 CountDownLatch 的源码,可知其底层结构仍然依赖了 AQS,对其线程所封装的结点是采用共享模式,而 ReentrantLock 是采用独占模式。