前言
在高并发、多线程的Java服务端开发中,线程间的协作与同步是绕不开的核心命题。JUC(java.util.concurrent)包提供的CountDownLatch、CyclicBarrier、Semaphore等同步工具类,基于AQS队列同步器实现,是解决多线程等待、分组执行、流量控制等场景的标准方案。本文从底层原理、源码核心、实战场景、代码落地、易混淆点对比五个维度,彻底讲透这类并发工具的使用与原理,助力生产环境高效落地。
一、CountDownLatch:等待多线程完成的门栓
1.1 核心原理
CountDownLatch是一个一次性的同步工具,允许一个或多个线程等待其他线程完成操作后再执行。 底层基于AQS共享锁实现,通过AQS的state变量作为计数器:
- 初始化时指定计数器值;
- 线程调用
countDown()方法,计数器减1; - 线程调用
await()方法,会阻塞直到计数器变为0; - 计数器归零后,所有阻塞线程被唤醒,且不可重置复用。
1.2 源码核心要点
- 继承
AbstractQueuedSynchronizer,使用共享模式获取同步状态; await():调用acquireSharedInterruptibly(),state≠0则进入AQS队列阻塞;countDown():调用releaseShared(),state减1,为0时唤醒队列中所有线程。
1.3 生产级实战示例
场景:多线程异步查询数据,主线程等待所有子线程完成后汇总结果。
<!-- pom.xml lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
package com.jam.demo.concurrency;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CountDownLatch实战:多线程数据汇总
* @author ken
*/
@Slf4j
public class CountDownLatchDemo {
/** 线程池 */
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(3);
/**
* 多线程查询并汇总结果
* @param taskCount 任务数量
* @return 汇总结果
*/
public List<String> multiThreadQuery(int taskCount){
List<String> resultList = Lists.newArrayList();
CountDownLatch countDownLatch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
int finalI = i;
EXECUTOR.execute(()->{
try {
// 模拟业务查询
String result = "任务_"+ finalI +"_执行结果";
resultList.add(result);
log.info("子线程{}执行完成",finalI);
}finally {
// 必须在finally中执行,避免异常导致计数无法归零
countDownLatch.countDown();
}
});
}
try {
// 等待所有子线程完成,超时时间5秒
countDownLatch.await();
} catch (InterruptedException e) {
log.error("等待子线程执行被中断",e);
Thread.currentThread().interrupt();
}
return resultList;
}
public static void main(String[] args) {
CountDownLatchDemo demo = new CountDownLatchDemo();
List<String> result = demo.multiThreadQuery(3);
if(CollectionUtils.hasElements(result)){
log.info("主线程汇总结果:{}",result);
}
// 关闭线程池
EXECUTOR.shutdown();
}
}
二、CyclicBarrier:可循环使用的同步屏障
2.1 核心原理
CyclicBarrier意为循环屏障 ,让一组线程到达屏障后阻塞,直到所有线程都到达屏障,才统一放行,且支持重置复用 。 核心特性:
- 支持parties 线程数,所有线程到达后放行;
- 支持barrierAction,放行后优先执行的回调任务;
- 基于
ReentrantLock和Condition实现,而非直接使用AQS; - 计数器归零后可通过
reset()重置,重复使用。
2.2 与CountDownLatch核心区别
- CountDownLatch:计数器一次性使用,线程等待其他线程完成;
- CyclicBarrier:计数器可重置复用,线程间相互等待。
2.3 生产级实战示例
场景:多线程分组计算,所有线程计算完成后统一汇总。
package com.jam.demo.concurrency;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CyclicBarrier实战:分组计算汇总
* @author ken
*/
@Slf4j
public class CyclicBarrierDemo {
/** 线程池 */
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
// 3个线程到达屏障后,执行汇总回调
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
log.info("所有线程计算完成,执行汇总逻辑");
});
for (int i = 0; i < 3; i++) {
int finalI = i;
EXECUTOR.execute(()->{
try {
log.info("线程{}开始计算",finalI);
// 模拟计算逻辑
Thread.sleep(1000);
log.info("线程{}计算完成,等待其他线程",finalI);
// 等待屏障
cyclicBarrier.await();
log.info("线程{}继续执行后续逻辑",finalI);
} catch (InterruptedException | BrokenBarrierException e) {
log.error("屏障等待异常",e);
Thread.currentThread().interrupt();
}
});
}
EXECUTOR.shutdown();
}
}
三、Semaphore:流量控制的信号量
3.1 核心原理
Semaphore即信号量,用于控制同时访问资源的线程数量,实现限流、资源隔离。 底层基于AQS共享锁,核心逻辑:
- 初始化指定许可数
permits; acquire():获取许可,无许可则阻塞;release():释放许可,唤醒阻塞线程;- 支持公平/非公平模式,默认非公平。
3.2 实战示例
场景:接口限流,控制同时执行的线程数。
package com.jam.demo.concurrency;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Semaphore;
/**
* Semaphore接口限流实战
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/concurrency")
@Tag(name = "并发工具类接口")
public class SemaphoreController {
/** 最多允许2个线程同时访问 */
private static final Semaphore SEMAPHORE = new Semaphore(2);
@GetMapping("/limit")
@Operation(summary = "信号量限流接口")
public String limitApi(){
boolean acquire = false;
try {
// 获取许可
acquire = SEMAPHORE.tryAcquire();
if(!acquire){
return "系统繁忙,请稍后再试";
}
// 模拟业务执行
log.info("线程{}获取许可执行业务",Thread.currentThread().getName());
Thread.sleep(2000);
return "执行成功";
} catch (InterruptedException e) {
log.error("接口执行异常",e);
Thread.currentThread().interrupt();
return "执行失败";
}finally {
if(acquire){
// 释放许可
SEMAPHORE.release();
log.info("线程{}释放许可",Thread.currentThread().getName());
}
}
}
}
四、三大并发工具类核心对比
| 特性 | CountDownLatch | CyclicBarrier | Semaphore |
| 复用性 | 不可复用 | 可循环复用 | 可重复获取释放 |
| 核心作用 | 等待其他线程完成 | 线程间相互等待 | 控制并发线程数 |
| 底层实现 | AQS共享锁 | ReentrantLock+Condition | AQS共享锁 |
| 使用场景 | 多任务汇总、启动等待 | 分组计算、分批执行 | 接口限流、资源池控制 |
| 计数器行为 | 只减不增 | 归零后重置 | 动态增减 |
五、生产环境使用注意事项
- CountDownLatch:
countDown()必须放在finally中,避免异常导致永久阻塞; - CyclicBarrier:线程数必须与parties一致,否则会永久等待,可使用
await(timeout)防止死锁; - Semaphore:
release()必须与acquire()成对出现,避免许可泄露; - 超时控制:优先使用带超时的
await(TimeUnit)方法,防止线程无限阻塞; - 异常处理:捕获中断异常,恢复线程中断状态,避免业务逻辑异常。
六、总结
CountDownLatch、CyclicBarrier、Semaphore是JUC最核心的同步工具,三者均基于AQS或锁机制实现,针对不同线程协作场景提供标准解决方案:
- 一次性等待多线程完成:选择CountDownLatch;
- 线程间相互等待、可循环使用:选择CyclicBarrier;
- 控制并发流量、资源访问限制:选择Semaphore。
理解其底层原理与使用边界,才能在高并发场景中写出稳定、高效的多线程代码,避免线程阻塞、死锁、资源耗尽等问题。