简介
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步(即:用于线程之间的通信而不是互斥)。它允许一个或多个线程进入等待状态,直到其他线程执行完毕后,这些等待的线程才继续执行。
CountDownLatch通过一个计数器来实现,其中维护了一个count变量和操作该变量的两个主要方法:
- await()方法:线程调用await()方法,会使调用该方法的线程进入阻塞状态,并将其加入到阻塞队列中。
- countDown()方法:线程调用countDown()方法,会将CountDownLatch中count的值-1。当count变量的值递减为0,会唤醒阻塞队列中调用await()方法的线程继续执行业务处理。
应用场景
CountDownLatch是一种非常实用的并发控制工具,它的主要应用场景:
主线程等待多个子线程完成任务处理。如:主线程等待其他线程各自完成任务处理后,再继续执行。
实现多个线程开始执行任务处理的最大并行性(注意:是并行而非并发)。如:多个线程需要在同一时刻开始执行任务处理,可以通过如下方式实现:
1)初始化一个的CountDownLatch变量(计数器的初始化值为1)。
2)需要在同一时刻执行任务处理的所有线程调用CountDownLatch.await()方法进入阻塞状态。
3)主线程调用CountDownLatch.countDown()方法将计数器-1(此时计数器的值为0),唤醒所有调用CountDownLatch.await()方法进入阻塞状态的线程开始执行任务处理。
实现原理
CountDownLatch中定义了一个Sync类型的变量和操作该变量的方法。
源码如下:
// Sync类型的同步变量
private final Sync sync;
// 构造函数,用于初始化CountDownLatch计数器
public CountDownLatch(int count) {
...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程被其他线程中断。
public void await() throws InterruptedException {
...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程等待超时或者被其他线程中断。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
...}
// 递减AQS中的state(计数器)值,如果state的值递减为0,则唤醒调用await()方法进入阻塞的线程。
public void countDown() {
...}
// 返回state的值。
public long getCount() {
...}
// 返回标识CountDownLatch及其计数器值的字符串。
public String toString() {
...}
其中,最重要的是sync类型的变量、await()和countDown()方法。
Sync
Sync是CountDownLatch的静态内部类器,它继承了
AbstractQueuedSynchronizer(AQS),主要用于CountDownLatch的同步状态,创建CountDownLatch时进行初始化。
CountDownLatch构造函数:
public CountDownLatch(int count) {
// 如果传入的count值小于0,则抛出IllegalArgumentException异常
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化Sync
this.sync = new Sync(count);
}
初始化Sync时,将传入的count参数值赋值给AQS的同步状态state,state是一个volatile修饰的int值,一个线程修改了state值,其他线程能够立刻感知,从而保证state值在并发场景下的可见性。
同时,Sync实现了AQS的tryAcquireShared()和tryReleaseShared()方法:
java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
// 尝试获取共享资源
protected int tryAcquireShared(int acquires) {
/**
* 用于根据state(计数器)的值来尝试获取共享资源:
* state的值为0,返回1,表示可以获取共享资源。
* state的值不为0,返回-1,表示无法获取共享资源。
*/
return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享资源(AQS)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取当前state的值
int c = getState();
// 如果state的值为0,则返回false(即:没有需要释放的资源)
if (c == 0)
return false;
// 如果state的值大于0,则将state的值-1,并通过CAS的方式更新state的最新值
int nextc = c-1;
if (compareAndSetState(c, nextc))
/**
* 返回资源释放结果:
* 释放资源后state的值为0,则返回true,表示可以唤醒调用await()方法进入阻塞的线程。
* 释放资源后state的值不为0,则返回false,表示继续阻塞调用await()方法的线程,直到state的值被减为0。
*/
return nextc == 0;
}
}
await方法
CountDownLatch通过CountDownLatch#await方法调用CountDownLatch.Sync#tryAcquireShared方法尝试获取共享资源:
- 获取到共享资源,则唤醒调用await()方法的线程执行业务处理。
- 获取不到共享资源,则继续阻塞调用await()方法的线程,直到state的值递减为0(即:其他线程释放完共享资源)。
CountDownLatch#await方法源码解析:
// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
// 获取共享资源(可中断)
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
// 获取共享资源(AQS)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程被其他线程中断,则抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
// 具体由继承AQS的Sync的tryAcquireShared()方法实现
if (tryAcquireShared(arg) < 0)
// 如果获取共享资源锁失败,则将当前线程封装成Node节点追加到CLH队列的末尾,等待被唤醒(即:进入阻塞)
doAcquireSharedInterruptibly(arg);
}
其中,
doAcquireSharedInterruptibly()方法源码解析请移步主页查阅->「一文搞懂」AQS(抽象队列同步器)实现原理及源码解析。
countDown方法
CountDownLatch通过CountDownLatch#countDown方法调用CountDownLatch.Sync#tryReleaseShared方法尝试释放共享资源:
- 如果释放某个共享资源后state的值为0,则唤醒调用await()方法的线程执行业务处理。
- 如果释放某个共享资源后state的值不为0,则继续阻塞调用await()方法的线程,直到state的值被减为0。
CountDownLatch#countDown方法源码解析:
// java.util.concurrent.CountDownLatch#countDown
public void countDown() {
// 释放共享资源(数量为1)
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 释放共享资源(AQS)
public final boolean releaseShared(int arg) {
// 具体由继承AQS的Sync的tryReleaseShared()方法实现
if (tryReleaseShared(arg)) {
// 唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
使用示例
主线程等待子线程完成处理
/**
* @author 南秋同学
* 主线程等待多个子线程完成任务处理
*/
@Slf4j
public class CountDownLatchExample {
@SneakyThrows
public static void main(String[] args) {
// 初始化一个的CountDownLatch变量(计数器的初始化值为5)
CountDownLatch cdl = new CountDownLatch(5);
// 初始化一个固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5 ; i++){
// 创建Runnable线程
Runnable runnable = new Runnable() {
@SneakyThrows
@Override
public void run() {
log.info("子线程-{}开始执行...", Thread.currentThread().getName());
Thread.sleep((long) (Math.random() * 10000));
log.info("子线程-{}执行完成", Thread.currentThread().getName());
cdl.countDown();
}
};
service.execute(runnable);
}
log.info("主线程-{}等待所有子线程执行完成...",Thread.currentThread().getName());
cdl.await();
log.info("所有子线程执行完成,开始执行主线程-{}",Thread.currentThread().getName());
}
}
执行结果:
14:13:01.299 [pool-1-thread-3] - 子线程-pool-1-thread-3开始执行...
14:13:01.299 [pool-1-thread-2] - 子线程-pool-1-thread-2开始执行...
14:13:01.299 [main] - 主线程-main等待所有子线程执行完成...
14:13:01.299 [pool-1-thread-1] - 子线程-pool-1-thread-1开始执行...
14:13:01.299 [pool-1-thread-5] - 子线程-pool-1-thread-5开始执行...
14:13:01.299 [pool-1-thread-4] - 子线程-pool-1-thread-4开始执行...
14:13:02.739 [pool-1-thread-3] - 子线程-pool-1-thread-3执行完成
14:13:03.792 [pool-1-thread-1] - 子线程-pool-1-thread-1执行完成
14:13:04.752 [pool-1-thread-5] - 子线程-pool-1-thread-5执行完成
14:13:07.761 [pool-1-thread-4] - 子线程-pool-1-thread-4执行完成
14:13:10.384 [pool-1-thread-2] - 子线程-pool-1-thread-2执行完成
14:13:10.385 [main] - 所有子线程执行完成,开始执行主线程-main
多线程最大并行处理
/**
* @author 南秋同学
* 实现多个线程开始执行任务处理的最大并行性
*/
@Slf4j
public class CountDownLatchExample {
@SneakyThrows
public static void main(String[] args) {
// 初始化一个的CountDownLatch变量(计数器的初始化值为1)
CountDownLatch referee = new CountDownLatch(1);
// 初始化一个的CountDownLatch变量(计数器的初始化值为5)
CountDownLatch sportsman = new CountDownLatch(5);
// 初始化一个固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5 ; i++){
// 创建Runnable线程
Runnable runnable = new Runnable() {
@SneakyThrows
@Override
public void run() {
log.info("运动员-{},等待裁判发布开始口令", Thread.currentThread().getName());
referee.await();
log.info("运动员-{},收到裁判发布的开始口令,起跑...", Thread.currentThread().getName());
Thread.sleep((long) (Math.random() * 10000));
log.info("运动员-{}到达终点", Thread.currentThread().getName());
sportsman.countDown();
}
};
service.execute(runnable);
}
log.info("裁判-{}准备发布开始口令...",Thread.currentThread().getName());
Thread.sleep((long) (Math.random() * 10000));
referee.countDown();
log.info("裁判-{}已经发布开始口令,等待所有选手达到终点...",Thread.currentThread().getName());
sportsman.await();
log.info("所有运动员达到终点,裁判-{}开始计分",Thread.currentThread().getName());
}
}
执行结果:
13:56:14.683 [pool-1-thread-3] - 运动员-pool-1-thread-3,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-2] - 运动员-pool-1-thread-2,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-5] - 运动员-pool-1-thread-5,等待裁判发布开始口令
13:56:14.683 [main] - 裁判-main准备发布开始口令...
13:56:14.683 [pool-1-thread-1] - 运动员-pool-1-thread-1,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-4] - 运动员-pool-1-thread-4,等待裁判发布开始口令
13:56:18.205 [main] - 裁判-main已经发布开始口令,等待所有选手达到终点...
13:56:18.205 [pool-1-thread-2] - 运动员-pool-1-thread-2,收到裁判发布的开始口令,起跑...
13:56:18.205 [pool-1-thread-3] - 运动员-pool-1-thread-3,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-5] - 运动员-pool-1-thread-5,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-4] - 运动员-pool-1-thread-4,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-1] - 运动员-pool-1-thread-1,收到裁判发布的开始口令,起跑...
13:56:22.110 [pool-1-thread-4] - 运动员-pool-1-thread-4到达终点
13:56:23.866 [pool-1-thread-1] - 运动员-pool-1-thread-1到达终点
13:56:26.803 [pool-1-thread-3] - 运动员-pool-1-thread-3到达终点
13:56:28.019 [pool-1-thread-5] - 运动员-pool-1-thread-5到达终点
13:56:28.178 [pool-1-thread-2] - 运动员-pool-1-thread-2到达终点
13:56:28.179 [main] - 所有运动员达到终点,裁判-main开始计分