CountDownLatch是一种同步辅助工具,允许一个或多个线程等待在其他线程中执行的一组操作完成,可以利用它来实现多线程协调共同完成某些操作,比如多线程共同完成一个大任务。
CountDownLatch 底层维护了一个计数器,所有线程调用countDown()方法后计数器减1,主线程等到直到计数器为0,可以用作发令抢 一个线程等待 所有线程将计数器减少为0,模拟并发的场景。可以一个或者多个线程,等待计数器到0。CountDownLatch的实现很简单,基于AQS实现的共享锁。
AQS实现:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
AI 代码解读
countDown方法:
AI 代码解读
public void countDown() {
sync.releaseShared(1); }
AI 代码解读
countDown方法不断释放共享锁,state减1,然后唤醒下一个排队加锁的线程,state所有线程共享,没有像ReentrantReadWriteLock那样用ThreadLocal隔离不同线程的锁次数。
await方法:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
AI 代码解读
await方法,判断state是否达到了0,如果没有到0则会进入排队和阻塞,实现等待其他线程操作的目的,直到state=0。
Rocketmq扩展:
在Rocketmq中,就对CountDownLatch进行了扩展,添加了重置功能,使得CountDownLatch也可以像CyclicBarrier一样可以循环使用。它将初始化时的state值暂存到startCount上,reset时重新将startCount值设置到state上。
发现CountDownLatch的实现很简单,其实是因为AQS已经提供了很好的抽象,使得实现其他辅助的同步工具类非常简单,因此学好AQS对于学习并发工具是很有用的。