CountDownLatch和ReentrantLock有很多相似的地方,因为相似部分内容已经在博客《ReentrantLock详解》中讨论过了,本文不会再次详细讨论。
CountDownLatch是一个计数(构造函数中指定此数值)的锁,当通过countDown方法将此计数值减为0时会唤醒之前调用await的线程。一般用于当某些任务执行完后,在执行其他任务的场景中。
一 CountDownLatch是什么
1 原理分析
CountDownLatch是一个同步的辅助类,它能够使一个线程等待其他线程完成各自的工作后再执行。
CountDownLatch是基于AbstractQueuedSynchronizer(AQS)实现的,其通过state作为计数器。构造CountDownLatch时初始化一个state,以后每调用countDown方法一次,state减1;当state=0时,唤醒在await上被挂起的线程。
CountDownLatch的计数器state不能被重置,如果需要一种能重置count的版本,可以考虑使用CyclicBarrier。
2 用途
CountDownlatch是一个多功能的同步工具,可以被用于各种目的。
一个CountDownLatch通过一个值为1的count被初始化,来作为一个开/关的门或门闩:所有调用了await()的线程都会在门前等待,直到门被一个线程通过调用countDown()打开。
一个被初始化为N的CountDownLatch可以被用来“在N个线程都完成了某种操作(或者一些操作已经被完成了N次)之后创建一个线程”。
二 源码分析
1 构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch是通过一个计数器来实现的,计数器的初始值为等待线程数量。
2 等待信号
1) await
此方法将导致当前线程等待,直到CountDownLatch通过countDown()方法使计数器值为0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
2) acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
3) tryAcquireShared
如果state=0,即计数器的值为0,返回1,表示不用等待;否则返回-1,可表示还需要等待。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
4) doAcquireSharedInterruptibly
此方法的主要作用是让获取不到锁的线程挂起,通过向等待队列中添加一个Node(此Node与当前线程相关联);接着判断是应该唤醒还是挂起。如果node是队列中第一个有效节点,那么唤醒对应的线程;否则通过LockSupport挂起线程。具体逻辑如下:
创建一个SHARED类型的节点,加入到等待队列中。
接下来无限循环,尝试进行以下操作,直到获取到锁或者因为取消获取锁而被唤醒。
如果node是队列中第一个等待线程,那么尝试获取读锁,获取成功后更新队列的head,如果后一个节点也是等待读锁,那么后面一个节点的线程线程。这一过程实现读锁共享。
判断是否应该挂起当前节点对应的线程,如果应该挂起,则通过LockSupport挂起线程。
线程被唤醒以后,设置中断标志位。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3 计数器减1
计数器减1操作,当计数器减到0时,调用await的线程将被唤醒。
1) countDown
countDown是将计数器减1,其本质是释放一次锁。
public void countDown() {
sync.releaseShared(1);
}
2) releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3) tryReleaseShared
释放锁,即将计数器-1;如果减-1后state=0,则表示全部的锁已经释放完了,当且仅当此时返回true。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
4) doReleaseShared
唤醒head后的一个处于挂起状态的线程。主要逻辑如下:
- 如果head的waitStatus=SIGNAL那么修改head.waitStatus=0,然后唤醒head后面的一个等待被唤醒的线程。
- 如果head的waitStatus=0那么修改head.waitStatus=PROPAGATE
- 循环进行以上两个操作直到成功、并且head不曾变动过方才退出循环。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
4 PROPAGATE行为
当通过countDown操作将计数器的值改为0以后,通过doReleaseShared()方法中unparkSuccessor(h)代码的唤醒等待队列中的一个等待线程。
当第一个线程被唤醒以后,继续执行doAcquireSharedInterruptibly()方法的for循环,在此循环中执行到setHeadAndPropagate方法时,会将更新head,传播唤醒操作。
setHeadAndPropagate传播行为的逻辑如下:
- 设置将此次唤醒Node的线程信息清空,然后设置为新的head,
- 唤醒head后的Node对应的线程。唤醒时重新执行setHeadAndPropagate。
三 示例
以下示例中,我们初始化CountDownLatch的信号量是3,那么只有调用三次countDownLatch.countDown()时,之前通过countDownLatch.await()的线程才能被唤醒。
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
String name = "T-"+i;
Thread thread = new Thread(() -> {
try {
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始获取资源");
doTask(name);
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 资源释放完成");
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
}
try {
System.out.println("时间="+DateUtil.getCurrentTime()+"准备等待工作线程执行");
countDownLatch.await();
System.out.println("时间="+DateUtil.getCurrentTime()+"工作线程已经执行完");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void doTask(String name){
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始使用资源,执行任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
以下是输出结果,可以看到主线程在等待3个工作线程执行完以后才开始执行。
时间=11:05:01:952 -工作线程- thread=T-2 开始获取资源
时间=11:05:01:991 -工作线程- thread=T-2 开始使用资源,执行任务
时间=11:05:01:952 -工作线程- thread=T-0 开始获取资源
时间=11:05:01:994 -工作线程- thread=T-0 开始使用资源,执行任务
时间=11:05:01:952 准备等待工作线程执行
时间=11:05:01:952 -工作线程- thread=T-1 开始获取资源
时间=11:05:01:997 -工作线程- thread=T-1 开始使用资源,执行任务
时间=11:05:02:993 -工作线程- thread=T-2 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-0 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-1 资源释放完成
时间=11:05:02:999 工作线程已经执行完