1 使用计数器锁实现任务计数
多任务同步神器,它允许一个或多个线程,等待其它线程完成工作,比如我们现在有一个需求:
- 有20个任务,需要将每个任务的执行结果算出来,但是每个任务执行的时间未知。
- 当所有的任务执行结束后,立即整合统计所有的执行结果。
我们并不知道任务可以在什么时间完成,因此执行统计的时间不好设置,设置短了则还有任务没有完成,设置长了则统计延迟。
CountdownLatch
可以做到,它是一个实现子任务同步的工具。Demo如下
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(20); for (int i = 0; i < 20; i++) { final int finalI = i; new Thread(() ->{ try { Thread.sleep((long)(2000 + new Random().nextDouble())); System.out.println("thread " + finalI + " finished"); } catch (InterruptedException exception) { exception.printStackTrace(); } latch.countDown(); // 相当于计数器,每次减少1 }).start(); } latch.await(); //可以多个线程同时等待,这里仅演示了一个线程进行等待 System.out.println("all sub task finished!"); }
执行结果如下
其实它就是一个线程计数器,注意CountDownLatch
是一次性的,不能重复使用。比如下面再多调用一次latch.await
,程序还是正常结束的(毕竟计数不可逆,已经是0了,而且无法将计数器重置).
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(20); for (int i = 0; i < 20; i++) { final int finalI = i; new Thread(() ->{ try { Thread.sleep((long)(2000 + new Random().nextDouble())); System.out.println("thread " + finalI + " finished"); } catch (InterruptedException exception) { exception.printStackTrace(); } latch.countDown(); }).start(); } latch.await(); latch.await(); System.out.println("all sub task finished!"); }
2 await的源码剖析
上面已经演示了使用,下面来看看它的原理吧。
public class CountDownLatch { // 使用了AQS,不过是基于共享锁实现的 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //这里实际上使用count作为了共享锁的state值 // count数与共享锁的数量相同 // 每调用一次countdown就是解一层锁 Sync(int count) { setState(count); } int getCount() { return getState(); } // 重写了共享锁的实现 // 获取共享锁其实就是等待其它线程把state减到0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 解锁的过程 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); //构造sync方法 } //通过acquireSharedInterruptibly获取共享锁, // 但是如果state不为0,将会被持续阻塞,后文详解 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 同上,但是会被阻塞 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //countDown其实就是解锁一次 public void countDown() { sync.releaseShared(1); } // 获取当前的计数,也就是AQS的state值 public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; }
我们终于知道CountDownLatch它的原理了,原来它就是利用共享锁来实现计数的,锁的数量就是计数数量,countdown的过程就是解锁的过程。
我们在该系列博客已经介绍过独占锁,但是没有深入剖析过共享锁,这里我们来深入共享锁的源码进行剖析,以便大家对CountDownLatch具有更为深入的理解。
点进AbstractQueuedSynchronizer
类中。先看看acquireShared
获取共享锁,这个就是CountDownLatch
中await
方法调用的底层方法(实际上是acquireSharedInterruptibly
,不过原理是一样的)。
public final void acquireShared(int arg) { // 尝试获取共享锁,小于0则失败 if (tryAcquireShared(arg) < 0) // 获取共享锁失败,进入阻塞 doAcquireShared(arg); }