CountDownLatch
- 典型应用场景:主线程启动多个子线程同时执行业务逻辑,所有子线程都执行完毕,再唤醒主线程继续执行。
- 例子:
public class CountDownLatchTest { /** * 计数器,初始为0 */ private Integer count = 0; public Integer getCount(){ return count; } /** * 执行+1操作 */ public void add(){ count++; } public static void main(String[] args){ CountDownLatchTest test = new CountDownLatchTest(); // 线程个数 int threadCount = 3; //初始化工作线程的个数,并用CountDownLatch管理 CountDownLatch countDownLatch = new CountDownLatch(threadCount); for(int i=0;i<threadCount;i++) { new Thread(() -> { test.add(); countDownLatch.countDown(); }).start(); } try { //等待所有线程执行完毕,在所有线程都执行完毕之前主线程会阻塞 countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(test.getCount()); } }
主线程启动了3个子线程执行add操作,等待3个子线程都执行完毕了,主线程继续执行打印最终的执行结果为:3。
3.具体实现原理:
public class CountDownLatch { //继承于AQS的同步器 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //有参构造函数,count记录了共享资源的个数 Sync(int count) { setState(count); } //获取当前共享资源的个数 int getCount() { return getState(); } /** * 尝试以共享方式获取资源 * @return 1表示获取成功,-1表示获取失败 */ protected int tryAcquireShared(int acquires) { //如果当前资源个数为0,则表示获取成功,否则表示失败 return (getState() == 0) ? 1 : -1; } /** * 尝试以共享方式释放资源 * @return true表示释放成功,false表示释放失败 */ protected boolean tryReleaseShared(int releases) { // 对当前资源执行-1操作 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; //CAS更新资源个数,CAS失败表示有其他线程竞争,此时需要重试 if (compareAndSetState(c, nextc)) //执行-1操作后,如果资源个数为0,则表示释放成功 return nextc == 0; } } } private final Sync sync; //有参构造函数,可以看到CountDownLatch中禁用了默认构造函数,意味着必须传入资源个数 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } //等待操作,此方法会使调用线程阻塞,直到其他调用countdown的方法都执行完毕 public void await() throws InterruptedException { //此处调用的是AQS的acquireSharedInterruptibly方法,下文会具体分析 sync.acquireSharedInterruptibly(1); } //和await()类似,但是有一个等待的超时时间,过了超时时间会自动取消等待 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //将state的值-1,当getState()==0时,会唤醒调用await()线程 public void countDown() { //调用AQS的releaseShared方法,下文会具体分析 sync.releaseShared(1); } //获取当前资源的个数 public long getCount() { return sync.getCount(); } }
4.CountDownLatch中用到的AQS的核心方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //尝试获取资源失败(tryAcquireShared的返回值<0),会将当前线程阻塞并排队等待 if (tryAcquireShared(arg) < 0) //该方法会将当前线程阻塞,并放入AQS的同步队列等待,此处不再分析 doAcquireSharedInterruptibly(arg); } //释放共享资源 public final boolean releaseShared(int arg) { //尝试释放共享资源成功时(此处要结合CountDownLatch提供的tryReleaseShared方法理解),进行具体的释放操作 if (tryReleaseShared(arg)) { //AQS提供的执行具体的资源释放操作,会唤醒调用await()方法的线程 doReleaseShared(); return true; } return false; }
5.总结:CountDownLatch使用AQS的state变量作为状态计数器,执行countdown操作的线程会将计数器减1,当前计数器的值为0时(getState()==0),会唤醒执行await操作的线程继续执行。