【1】CountDownLatch是什么
CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:
确保某个计算在其需要的所有资源都被初始化之后才继续执行;
确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
等待直到某个操作所有参与者都准备就绪再继续执行。
CountDownLatch有一个正数计数器,countDown()方法对计数器做减操作,await()方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。
闭锁(倒计时锁)主要用来保证完成某个任务的先决条件满足。是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
CountDownLatch同样依赖队列同步器AbstractQueuedSynchronizer,其类方法如下:
其内部类Sync同样继承了AQS并重写了tryAcquireShared和tryReleaseShared方法。同时也可以表明CountDownLatch是基于共享锁模式的。
CountDownLatch 的两种典型用法
①某一线程在开始运行前等待n个线程执行完毕。
将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
②实现多个线程开始执行任务的最大并行性。
注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。如下示例,在多线程运行情况下,计算多线程耗费时间:
public class TestCountDownLatch { public static void main(String[] args){ LatchDemo latchDemo = new LatchDemo(); long begin = System.currentTimeMillis(); //多线程 for (int i = 0; i <5 ; i++) { new Thread(latchDemo).start(); } //主线程 long end = System.currentTimeMillis(); System.out.println("耗费时间:"+(end-begin)); } } class LatchDemo implements Runnable{ @Override public void run() { for (int i = 0; i < 50000; i++) { if (i%2==0){ System.out.println(i); } } } }
如上示例,很显然让不能计算出多线程运行的时间!!这时,就可以使用 闭锁解决这个问题。
【2】CountDownLatch源码分析与使用
① 修改【1】中代码如下:
public class TestCountDownLatch { public static void main(String[] args){ //CountDownLatch 为唯一的、共享的资源 final CountDownLatch latch = new CountDownLatch(5); LatchDemo latchDemo = new LatchDemo(latch); long begin = System.currentTimeMillis(); for (int i = 0; i <5 ; i++) { new Thread(latchDemo).start(); } try { //多线程运行结束前一直等待 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("耗费时间:"+(end-begin)); } } class LatchDemo implements Runnable{ private CountDownLatch latch; public LatchDemo(CountDownLatch latch){ this.latch=latch; } public LatchDemo(){ super(); } @Override public void run() { //当前对象唯一,使用当前对象加锁,避免多线程问题 synchronized (this){ try { for (int i = 0; i < 50000; i++) { if (i%2==0){ System.out.println(i); } } }finally { //保证肯定执行 latch.countDown(); } } } }
测试结果如下图:
② CountDownLatch源码如下:
package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class CountDownLatch { /** * CountDownLatch的同步控制,同样依赖使用AQS,使用AQS state表示count计数 */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { //设置初始计数 setState(count); } int getCount() { //获取当前计数 return getState(); } //这里重写了AQS的tryAcquireShared方法,小于0表示count>0,可以获取。 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { //count-1,如果count变为0,则唤醒所有 for (;;) { //获取当前状态,为0表示未锁,不用释放 int c = getState(); if (c == 0) return false; int nextc = c-1; //使用CAS算法,c为期望值,nextc为更新值 if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; /** 使用给定的count构造CountDownLatch,count表示线程通过await前必须要执行的次数,count不能小于0 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** 让当前线程等待直到count减数为0,除非线程被中断。如果count为0,线程将立即返回--不再阻塞等待。 如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下两种情况之一: 1.countDown方法调用导致count减数为0; 2.别的线程中断了当前线程 * 线程等待时,如果被中断将会抛出InterruptedException异常 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 使当前线程处理等待状态直到count减为0或者指定等待时间过去。 如果当前count是0,则线程立即返回true。 如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下三种情况之一: 1.countDown方法调用导致count减数为0; 2.别的线程中断了当前线程 3.指定等待时间过去 如果等待时间过去但是count>0,则返回false。如果等待时间时间小于或等于零,方法将不会等待。 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 将count-1 ,如果count减一后为0 ,则释放所有等待的线程 */ public void countDown() { sync.releaseShared(1); } /** 返回当前的count,该方法通常用在debug或者测试中 */ public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
简单总结如下:
内部类Sync同样继承AQS;
AQS的state代表count;
初始化使用计数器count;
count代表多个线程执行或者某个操作执行次数;
countDown()方法将会将count-1;
count为0将会释放所有等待线程;
await方法将会阻塞直到count为0;
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。
count不为0,但是等待时间过去将会返回false。
开关锁应用;
问题分解应用–并行性;
③ CountDownLatch Javadoc
一个同步助手使一个或多个线程在其他线程完成一系列操作前一直处于等待状态。CountDownLatch使用一个count初始化。await方法将会使线程阻塞直到由于countDown方法导致count计数达到0。如果count减为0,所有等到的线程将会被释放并立即返回到后续调用。这是一次性的现象,不能重置计数。如果需要重新设置count,可以使用CyclicBarrier–栅栏。
CountDownLatch是一个多能力的同步工具可以被使用到许多目的。
使用 one 作为count值初始化的CountDownLatch可以被用作开关锁或着门:所有线程在这个门前等待(await方法)直到某个线程调用countDown将门打开。
使用N作为count值初始化的CountDownLatch可以被用于使某个线程等待N个线程完成某个动作或者某个动作被完成N次。
CountDownLatch的一个有用特性是,它不要求调用countDown的线程在继续之前等待计数达到零,它只是防止任何线程通过直到所有线程都可以通过。
④ 官方实例一
这里有两个类,其中一组worker使用两个倒计时锁存器:第一个锁是一个开始信号–阻止任何worker直到driver准备好;第二个锁是一个完成信号–允许driver等待直到所有的worker都完成。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }}
⑤ 官方实例二
另一个典型的用法是将问题分成N个部分,用Runnable描述每个部分,Runnable执行该部分并在锁存器上倒计时,并将所有Runnable排队给Executor。当所有子部件完成时,协调线程将能够通过等待(如果需要使用这种形式,建议使用CyclicBarrier)。
class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = Executors.newScheduledThreadPool(N) for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }}
【3】CountDownLatch中Sync使用AQS的相关方法
① await()方法中的acquireSharedInterruptibly(1)
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
AQS中acquireSharedInterruptibly(1)源码如下:
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果小于0,就执行获取操作 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
② await(long timeout, TimeUnit unit)中的tryAcquireSharedNanos(1, unit.toNanos(timeout))
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
AQS中tryAcquireSharedNanos(int arg, long nanosTimeout)源码如下:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //很有意思,这里用了 || return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
③ countDown()中的releaseShared(int arg)
public void countDown() { sync.releaseShared(1); }
AQS中releaseShared(int arg)源码如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }