👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD
🔥 2025本人正在沉淀中... 博客更新速度++
👍 欢迎点赞、收藏、关注,跟上我的更新节奏
📚欢迎订阅专栏,专栏别名《在2B工作中寻求并发是否搞错了什么》
一、入门
什么是线程池?
线程池(Thread Pool)是一种复用线程资源的机制,通过预先创建一定数量的线程,并管理它们的生命周期,避免频繁创建/销毁线程带来的性能开销。
线程池的核心思想是任务提交与执行解耦:你只管提交任务,线程池负责调度和执行。
为什么要线程池?
- 线程创建/销毁成本高
每次创建和销毁线程会消耗系统资源(如内存、CPU),频繁操作会导致性能下降。线程池通过复用已创建的线程,减少开销。 - 避免资源耗尽风险
无限制创建线程可能导致内存溢出(OOM)或CPU过载。线程池通过限制最大线程数,防止系统崩溃。 - 提升响应速度
任务到达时,线程池中可能有空闲线程立即执行,无需等待线程创建。 - 统一管理任务队列
线程池提供任务队列缓冲突发流量,并支持多种拒绝策略(如丢弃任务或抛异常),避免任务丢失或系统过载。
线程池解决了哪些问题?
| 问题 | 线程池的解决方案 |
| -- | -- |
|频繁创建/销毁线程开销大 |线程复用,减少系统调用和资源消耗|
|线程数量不可控 |通过核心线程数、最大线程数等参数限制并发量|
|任务执行缺乏管理 |提供任务队列、拒绝策略、监控接口等统一管理机制|
|系统稳定性差 | 避免资源耗尽,防止OOM或CPU过载 |
线程池的适用场景
- 高并发请求处理
- Web服务器(如Tomcat):每个HTTP请求分配一个线程处理,线程池管理这些线程,避免瞬间流量压垮系统。
- RPC框架(如Dubbo):远程调用通过线程池异步处理,提升吞吐量。
- 异步任务处理
- 后台日志记录、数据清洗等非实时任务,提交到线程池异步执行,不阻塞主线程。
- 定时/周期性任务
- 使用
ScheduledThreadPool
执行定时任务(如每日报表生成),或周期任务(如心跳检测)。
- 使用
- 资源受限场景
- 当系统资源(如数据库连接)有限时,通过线程池控制并发访问数量,防止资源争用。
二、如何使用线程池?
这边将结合,线程池构造方法的几个参数总结来说下线程池的使用,以下是线程池的构造方法。
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
AI 代码解读
1、创建线程池
通过 Executors
工厂类(快捷方式)
// 固定线程数的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 单线程线程池(保证任务顺序执行)
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 可缓存的线程池(线程数自动伸缩)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
AI 代码解读
直接使用 ThreadPoolExecutor
(推荐,更灵活)
可以自定义核心参数(核心线程数、队列类型、拒绝策略等)
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(10), // 任务队列(容量10)
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略(默认抛异常)
);
AI 代码解读
2、阻塞队列
在Java线程池中,阻塞队列(BlockingQueue
) 是任务调度和资源管理的核心组件,直接影响线程池的任务处理策略和性能。
|队列类型 | 数据结构 | 容量限制| 公平性 | 适用场景 |
| -- | -- | -- | -- | -- |
|ArrayBlockingQueue| 数组 |有界(需指定)| 可选| 固定大小队列,严格资源控制|
|LinkedBlockingQueue | 链表 | 可选(默认无界)| 无 |吞吐量优先,任务量波动较大|
|SynchronousQueue | 无存储(直接传递)| 容量为0| 可选 | 直接传递任务,避免队列堆积|
|PriorityBlockingQueue| 堆(优先级队列)| 无界| 无 |按优先级执行任务|
|DelayQueue |优先级堆(延迟)| 无界 |无 |定时任务、延迟执行|
|LinkedTransferQueue |链表 |无界 |可选 |高并发场景下的高效传输队列|
ArrayBlockingQueue
- 特点:基于数组的有界队列,初始化时必须指定容量,支持公平锁(减少线程饥饿)。
- 线程池行为:
- 当核心线程满且队列未满时,任务入队等待。
- 队列满时,若线程数未达最大线程数,创建新线程;否则触发拒绝策略。
- 适用场景:需要严格控制资源使用的场景,如高并发但任务处理时间较短的系统。
// 示例:创建容量为10的ArrayBlockingQueue BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 60, TimeUnit.SECONDS, queue );
AI 代码解读
LinkedBlockingQueue
- 特点:基于链表的队列,默认无界(Integer.MAX_VALUE),也可指定容量(有界)。
- 线程池行为:
- 无界队列:任务永远入队,最大线程数(maximumPoolSize)参数失效,线程数不超过核心线程数。 - 有界队列:行为类似ArrayBlockingQueue,但链表结构更灵活。
AI 代码解读 - 适用场景:任务量波动大且允许一定积压的场景,如异步日志处理。
// 默认无界队列(慎用,可能导致OOM) BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); // 有界队列(容量100) BlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(100);
AI 代码解读
SynchronousQueue
- 特点:不存储元素,每个插入操作必须等待另一个线程的移除操作(一对一传输)。
- 线程池行为:
- 任务直接交给可用线程,若无空闲线程且未达最大线程数,则创建新线程。 - 若线程数已达最大值,立即触发拒绝策略。
AI 代码解读 - 适用场景:高响应需求,任务处理快且不希望积压,如实时请求处理。
// 使用SynchronousQueue(通常搭配最大线程数为无界或较大值) BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, queue );
AI 代码解读
PriorityBlockingQueue
- 特点:无界优先级队列,任务按自然顺序或自定义Comparator排序。
- 线程池行为:任务按优先级执行,高优先级任务先被处理。
- 适用场景:需要任务调度优先级的场景,如VIP用户请求优先处理。
// 创建优先级队列(需实现Comparable或提供Comparator) BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(10, Comparator.comparing(Task::getPriority));
AI 代码解读
DelayQueue
- 特点:存储Delayed元素,元素在到期后才能被取出。
- 线程池行为:用于执行定时任务或延迟任务。
- 适用场景:定时任务调度,如缓存过期清理、延迟消息处理。
```java
// 示例:延迟任务
public class DelayedTask implements Delayed {
private long executeTime;
public DelayedTask(long delay) {
}this.executeTime = System.currentTimeMillis() + delay;
AI 代码解读
@Override
public long getDelay(TimeUnit unit) {
}return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
AI 代码解读
@Override
public int compareTo(Delayed o) {
}return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
AI 代码解读
}
// 使用DelayQueue
BlockingQueue queue = new DelayQueue();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
### 队列选择策略与最佳实践
1. 任务特性分析
- 短任务、高吞吐 → `LinkedBlockingQueue`(有界)或`ArrayBlockingQueue`。
- 实时性要求高 → `SynchronousQueue`(需合理设置最大线程数)。
- 优先级任务 → `PriorityBlockingQueue`。
- 延迟/定时任务 → `DelayQueue`。
2. 资源限制
- 内存敏感场景 → 有界队列(如`ArrayBlockingQueue`),避免OOM。
- CPU密集型任务 → 较小队列容量(减少上下文切换)。
- IO密集型任务 → 较大队列容量(应对任务堆积)。
3. 拒绝策略配合
- 使用有界队列时,必须设置合理的拒绝策略(如`CallerRunsPolicy`降级)。
## 3、拒绝策略
当线程池的任务队列已满且所有工作线程都在忙碌时,新提交的任务将触发拒绝策略(`RejectedExecutionHandler`)。拒绝策略决定了如何处理这些无法被立即处理的任务,是保障系统稳定性和任务可靠性的关键机制
### AbortPolicy(默认策略)
**行为**:直接抛出 `RejectedExecutionException` 异常,阻止任务提交。
**适用场景**:
- 严格的任务处理要求,不允许任务丢失。
- 需要显式处理异常,确保提交方知道任务被拒绝。
```java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy() // 默认策略
);
try {
executor.execute(() -> System.out.println("Task"));
} catch (RejectedExecutionException e) {
System.err.println("任务被拒绝: " + e.getMessage());
}
AI 代码解读
CallerRunsPolicy
行为:将任务回退给提交任务的线程(调用者线程)直接执行。
适用场景:
- 生产速度远大于消费速度时,通过调用者线程执行任务,自然降低提交速率。
- 作为临时降级策略,避免任务丢失。
风险:若提交任务的线程是主线程或关键线程,可能导致主线程阻塞。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 主线程可能在队列满时执行任务
executor.execute(() -> heavyTask());
AI 代码解读
DiscardPolicy
行为:静默丢弃被拒绝的任务,无任何通知。
适用场景:
- 允许任务丢失的非关键场景(如日志记录、心跳检测)。
- 任务具有时效性,过期后无意义。
风险:任务丢失可能导致数据不一致,需结合监控使用。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardPolicy()
);
AI 代码解读
DiscardOldestPolicy
行为:丢弃队列中最旧的任务(即队列头部的任务),然后重试提交当前任务。
适用场景:
- 新任务比旧任务优先级更高(如实时数据覆盖历史数据)。
- 队列中旧任务可能已过时或无意义。
风险:可能丢弃重要任务,需谨慎评估业务逻辑。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
AI 代码解读
自定义拒绝策略
通过实现 RejectedExecutionHandler
接口,可以定制符合业务需求的策略,例如记录日志、持久化存储或异步重试。
public class RetryWithLogPolicy implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(RetryWithLogPolicy.class);
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
// 记录任务信息
logger.warn("任务被拒绝: {}", task);
// 异步重试(例如提交到另一个线程池或延迟队列)
if (!executor.isShutdown()) {
executor.submit(task); // 尝试重新提交(可能递归触发拒绝)
// 或使用独立的重试机制:RetryExecutor.execute(task);
}
}
}
// 使用自定义策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 5, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new RetryWithLogPolicy()
);
AI 代码解读
拒绝策略的选择与最佳实践
策略 | 适用场景 | 注意事项 |
---|---|---|
AbortPolicy | 严格不允许任务丢失的场景(如金融交易)。 | 需捕获异常并处理,避免系统崩溃。 |
CallerRunsPolicy | 生产-消费速度不平衡,允许降级处理(如Web服务器请求过载)。 | 避免主线程阻塞,防止级联故障。 |
DiscardPolicy | 非关键任务(如监控埋点、日志记录)。 | 需监控丢弃任务量,避免数据丢失影响业务。 |
DiscardOldestPolicy | 新任务优先级高于旧任务(如实时消息覆盖)。 | 确保旧任务可丢弃,避免重要任务丢失。 |
自定义策略 | 需持久化、重试或通知外部系统(如消息队列回退)。 | 避免无限递归提交,防止资源耗尽。 |
4、线程工厂
线程工厂(ThreadFactory) 是用于创建新线程的核心接口,它允许开发者自定义线程的创建逻辑(如线程名称、优先级、守护线程属性等)。通过合理使用线程工厂,可以提升线程池的可维护性、调试效率和系统稳定性。
为什么需要线程工厂?
线程池使用 Executors.defaultThreadFactory()
创建线程,但这种方式存在以下局限性:
- 线程命名不友好:默认名称为
pool-1-thread-1
,难以快速定位问题。 - 无法统一配置:无法批量设置线程优先级、守护线程属性等。
- 缺乏异常处理:线程内未捕获的异常可能导致程序静默崩溃。
ThreadFactory
是一个函数式接口,只包含一个方法
public interface ThreadFactory {
Thread newThread(Runnable r); // 根据Runnable任务创建新线程
}
AI 代码解读
自定义线程工厂示例:
基础实现(命名和优先级)
public class CustomThreadFactory implements ThreadFactory {
private final String namePrefix; // 线程名前缀(如 "service-")
private final int priority; // 线程优先级
private final AtomicInteger counter = new AtomicInteger(1);
public CustomThreadFactory(String namePrefix, int priority) {
this.namePrefix = namePrefix;
this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置线程名称(如 "service-1")
thread.setName(namePrefix + counter.getAndIncrement());
// 设置优先级(如 Thread.NORM_PRIORITY=5)
thread.setPriority(priority);
// 设置非守护线程(默认即为非守护线程)
thread.setDaemon(false);
return thread;
}
}
AI 代码解读
增强实现(全局异常捕获)
public class EnhancedThreadFactory implements ThreadFactory {
private final String namePrefix;
private final Thread.UncaughtExceptionHandler exceptionHandler;
public EnhancedThreadFactory(String namePrefix, Thread.UncaughtExceptionHandler handler) {
this.namePrefix = namePrefix;
this.exceptionHandler = handler;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(namePrefix + "-" + thread.getId());
// 设置未捕获异常处理器
thread.setUncaughtExceptionHandler(exceptionHandler);
return thread;
}
}
AI 代码解读
5、提交任务
提交无需返回值的任务(Runnable)
// 使用execute()提交
customPool.execute(() -> {
System.out.println("Runnable任务执行线程:" + Thread.currentThread().getName());
});
AI 代码解读
提交需要返回值的任务(Callable)
// 使用submit()获取Future对象
Future<Integer> future = customPool.submit(() -> {
Thread.sleep(1000);
return 42;
});
// 获取结果(阻塞等待)
try {
int result = future.get(); // 可设置超时:future.get(2, TimeUnit.SECONDS)
System.out.println("任务结果:" + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("任务执行异常:" + e.getCause());
}
AI 代码解读
提交定时/周期任务
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 延迟5秒执行一次
scheduledPool.schedule(() -> {
System.out.println("延迟任务执行");
}, 5, TimeUnit.SECONDS);
// 固定频率执行(每3秒一次,无视任务耗时)
scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("固定频率任务");
}, 1, 3, TimeUnit.SECONDS);
// 固定间隔执行(任务结束后间隔2秒)
scheduledPool.scheduleWithFixedDelay(() -> {
System.out.println("固定间隔任务");
}, 1, 2, TimeUnit.SECONDS);
AI 代码解读
6、关闭线程池
平缓关闭
等待所有已提交任务完成
customPool.shutdown(); // 停止接收新任务
if (!customPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("线程池未在60秒内关闭");
}
AI 代码解读
强制关闭
立即中断所有任务
List<Runnable> unfinished = customPool.shutdownNow(); // 返回未执行任务列表
AI 代码解读
7、监控当前线程池
在 Java 的 ThreadPoolExecutor
中,确实提供了多个 API 用于监控线程池的状态,包括正在工作的线程数、任务队列长度、已完成任务数等。
|方法 | 作用|
| -- | -- |
|getActiveCount() | 返回正在执行任务的线程数(近似值,可能不实时准确)。|
| getPoolSize() | 返回线程池中当前的线程总数(包括空闲线程)。|
| getCorePoolSize()| 返回核心线程数(配置参数)。|
| getMaximumPoolSize() | 返回最大线程数(配置参数)。|
| getLargestPoolSize() | 返回线程池历史同时存在的最大线程数(峰值)。|
| getCompletedTaskCount() | 返回线程池已完成的任务总数(近似值)。|
| getTaskCount() | 返回线程池已调度执行的任务总数(包括已完成的、正在执行的和队列中的)。
| getQueue() | 返回任务队列对象(BlockingQueue),可进一步获取队列长度等。|
实例代码:
public class ThreadPoolMonitorDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 提交一些任务
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
try {
Thread.sleep(1000); // 模拟任务执行
System.out.println("Task " + taskId + " done.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 启动一个监控线程,定期打印线程池状态
new Thread(() -> {
while (true) {
try {
Thread.sleep(500); // 每0.5秒监控一次
} catch (InterruptedException e) {
break;
}
System.out.println("\n--- 线程池状态监控 ---");
System.out.println("活动线程数: " + executor.getActiveCount());
System.out.println("当前线程总数: " + executor.getPoolSize());
System.out.println("历史最大线程数: " + executor.getLargestPoolSize());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("队列中待处理任务数: " + executor.getQueue().size());
}
}).start();
// 关闭线程池
executor.shutdown();
}
}
AI 代码解读
输出结果
--- 线程池状态监控 --- 活动线程数: 5 当前线程总数: 5 历史最大线程数: 5 已完成任务数: 0 队列中待处理任务数: 10 --- 线程池状态监控 --- 活动线程数: 5 当前线程总数: 5 历史最大线程数: 5 已完成任务数: 5 队列中待处理任务数: 5 (随着任务执行,数值会动态变化)
AI 代码解读
8、处理异常
任务内部捕获异常
customPool.execute(() -> {
try {
// 业务代码
} catch (Exception e) {
System.err.println("任务异常: " + e);
}
});
AI 代码解读
通过Future获取异常
Future<?> future = customPool.submit(() -> {
throw new RuntimeException("模拟异常");
});
try {
future.get();
} catch (ExecutionException e) {
System.err.println("捕获到任务异常: " + e.getCause());
}
AI 代码解读
全局异常处理器
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
System.err.println("全局捕获异常: " + ex);
});
return t;
};
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
factory
);
AI 代码解读
三、与SpringBoot整合
1、快速集成线程池
在 Spring Boot 中使用线程池可以结合 @Async
注解和 ThreadPoolTaskExecutor
实现异步任务调度。
1. 启用异步支持
在启动类或配置类添加 @EnableAsync
@SpringBootApplication
@EnableAsync // 开启异步支持
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
AI 代码解读
2. 配置线程池参数
在 application.yml
中定义线程池参数
async:
thread-pool:
core-size: 4 # 核心线程数
max-size: 8 # 最大线程数
queue-capacity: 100 # 队列容量
keep-alive: 60s # 空闲线程存活时间
thread-name-prefix: async- # 线程名前缀
await-termination: 30s # 关闭时等待任务完成的时间
wait-for-tasks: true # 是否等待剩余任务完成
AI 代码解读
3. 自定义线程池配置类
创建 ThreadPoolConfig
类加载配置
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor(AsyncThreadPoolProperties properties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getCoreSize());
executor.setMaxPoolSize(properties.getMaxSize());
executor.setQueueCapacity(properties.getQueueCapacity());
executor.setKeepAliveSeconds((int) properties.getKeepAlive().getSeconds());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasks());
executor.setAwaitTerminationSeconds((int) properties.getAwaitTermination().getSeconds());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
AI 代码解读
注:通过 @ConfigurationProperties
读取配置(需定义 AsyncThreadPoolProperties
类绑定参数)。
2、使用线程池
使用 @Async
执行异步任务
编写异步服务
在 Service
类中使用 @Async
注解
@Service
public class OrderService {
@Async("taskExecutor") // 指定线程池 Bean 名称
public CompletableFuture<String> processOrderAsync(String orderId) {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return CompletableFuture.completedFuture("订单 " + orderId + " 处理完成");
}
}
AI 代码解读
调用异步方法:在 Controller
或 Service
中调用:
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("/process")
public CompletableFuture<String> processOrder(@RequestParam String orderId) {
return orderService.processOrderAsync(orderId);
}
}
AI 代码解读
手动提交任务到线程池: 注入线程池直接使用
@Service
public class ReportService {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
public void generateReport() {
taskExecutor.execute(() -> {
// 生成报表的逻辑
System.out.println("报表生成线程:" + Thread.currentThread().getName());
});
}
}
AI 代码解读
提交带返回值的任务
public CompletableFuture<String> queryData() {
return CompletableFuture.supplyAsync(() -> {
// 查询数据库或外部接口
return "查询结果";
}, taskExecutor);
}
AI 代码解读