本文已收录在Github,关注我,紧跟本系列专栏文章,咱们下篇再续!
- 🚀 魔都架构师 | 全网30W技术追随者
- 🔧 大厂分布式系统/数据中台实战专家
- 🏆 主导交易系统百万级流量调优 & 车联网平台架构
- 🧠 AIGC应用开发先行者 | 区块链落地实践者
- 🌍 以技术驱动创新,我们的征途是改变世界!
- 👉 实战干货:编程严选网
说说你对线程池的理解?
首先明确,池化的意义在于缓存,创建性能开销较大的对象,比如线程池、连接池、内存池。预先在池里创建一些对象,使用时直接取,用完就归还复用,使用策略调整池中缓存对象的数量。
Java创建对象,仅是在JVM堆分块内存,但创建一个线程,却需调用os内核API,然后os要为线程分配一系列资源,成本很高,所以线程是一个重量级对象,应避免频繁创建或销毁。 既然这么麻烦,就要避免呀,所以要使用线程池!
一般池化资源,当你需要资源时,就调用申请线程方法申请资源,用完后调用释放线程方法释放资源。但JDK的线程池根本没有申请线程和释放线程的方法。
那到底该如何理解它的设计思想呢? 其实线程池的设计,采用的是生产者-消费者模式:
- 线程池的使用方是生产者
- 线程池本身是消费者
以下简化代码即可显示线程池的基本原理:
package com.javaedge.concurrency.example.threadpool; import com.google.common.collect.Lists; import lombok.SneakyThrows; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * 简化线程池 */ public class MyThreadPool { /** * 利用阻塞队列实现生产者-消费者模式 */ BlockingQueue<Runnable> workQueue; /** * 保存内部工作线程 */ List<WorkerThread> threads = Lists.newArrayList(); MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) { this.workQueue = workQueue; // 创建工作线程 for (int idx = 0; idx < poolSize; idx++) { WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } /** * 提交任务 */ void execute(Runnable command) throws InterruptedException { // 将任务加入到workQueue workQueue.put(command); } /** * 工作线程:负责消费任务,并执行任务 */ class WorkerThread extends Thread { @SneakyThrows @Override public void run() { // 消费workQueue中的任务并执行 while (true) { Runnable task = workQueue.take(); task.run(); } } } public static void main(String[] args) throws InterruptedException { // 创建有界阻塞队列 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2); // 创建线程池 MyThreadPool pool = new MyThreadPool(10, workQueue); // 提交任务 pool.execute(() -> System.out.println("hello")); } }
JDK线程池最核心的就是ThreadPoolExecutor,看名字,它强调的是Executor,并非一般的池化资源。
为什么都说要手动声明线程池?
虽然Executors工具类提供的方法可快速创建线程池如newFixedThreadPool,但阿里有话说:
弊端真严重吗,newFixedThreadPool=OOM?写段测试代码:
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); printPoolStatus(pool); for (int i = 0; i < LOOP; i++) { pool.execute(() -> { String payload = IntStream.rangeClosed(1, 1000000) .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString(); try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.getMessage(); } log.info(payload); }); }
执行不久,出现OOM
Exception in thread "http-nio-30666-ClientPoller" java.lang.OutOfMemoryError: GC overhead limit exceeded
newFixedThreadPool线程池的工作队列直接new了一个LinkedBlockingQueue:
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
但其默认构造器是一个Integer.MAX_VALUE长度的队列,所以很快Q满:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
虽然用其可固定工作线程数量,但任务队列几乎无界。若任务较多且执行较慢,队列就会快速积压,内存不够,易OOM。
newCachedThreadPool也等于OOM?
[11:30:30.487] [http-nio-30666-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause java.lang.OutOfMemoryError: unable to create new native thread
可见OOM是因为无法创建线程,newCachedThreadPool这种线程池的最大线程数是Integer.MAX_VALUE,可认为无上限,而其工作队列SynchronousQueue是个无存储空间的阻塞队列。
public class Executors { /** * 创建一个可根据需要创建新线程的线程池,但会在可能的情况下复用之前已创建的线程。 * * 这种类型的线程池通常能提升执行大量短生命周期异步任务的程序性能。 * * 当调用execute方法提交任务时,如果线程池中有空闲线程, * 则会复用这些线程;如果没有可用线程,则会创建一个新线程并加入线程池。 * * 超过 60 秒未被使用的线程将被终止并从缓存中移除。 * 因此,如果该线程池长时间处于空闲状态,将不会占用任何系统资源。 * * 注意:可通过ThreadPoolExecutor的构造方法 * 创建具有类似特性但细节不同(例如超时参数不同)的线程池。 * * @return 新创建的线程池实例 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } }
只要有请求到来,就必须找到一条工作线程处理,若当前无空闲线程就再创建一个新的。由于我们的任务需很长时间才能执行完成,大量任务进来后会创建大量线程。而线程是需要分配一定内存空间作为线程栈的,如1MB,因此无限创建线程必OOM。
所以用线程池,别抱任何侥幸,以为只是处理轻量任务,不会造成队列积压或创建大量线程!如某业务一旦接受到请求,就会调用外部服务,该外部服务接口正常100ms内会响应,现在TPS过百,CachedThreadPool能稳定在占用10个左右线程情况下满足需求。
可天有不测风云,该外部服务不可用了!而代码里调用该服务设置的超时又特别长, 比如1min,1min可能已经进成千上万请求,产生几千个任务,需几千个线程,没多久就因为无法再创建新线程,OOM!
所以阿里才不建议使用Executors:
- 要结合实际并发情况,评估线程池核心参数,确保其工作行为符合预期,关键的也就是设置有界工作队列和数量可控的线程数
- 永远要为自定义的线程池设置有意义名称,以便排查问题 因为当出现线程数量暴增、死锁、CPU负载高、线程执行异常等事故时,往往都需抓取线程栈。有意义的线程名称,就很重要。示例如下:
ExecutorService executor = new ThreadPoolExecutor( 50, 500, 60L, TimeUnit.SECONDS, // 创建有界队列 new LinkedBlockingQueue<>(capacity: 2000), // 根据业务需求实现ThreadFactory r -> new Thread(r, name: "echo-" + r.hashCode()), // 建议根据业务需求实现RejectedExecutionHandler new ThreadPoolExecutor.CallerRunsPolicy());
注意异常处理
通过ThreadPoolExecutor#execute()提交任务时,若任务在执行的过程中出现运行时异常,会导致 执行任务的线程 终止。 但要命的是,有时任务虽然异常了,但你却收不到任何通知,你还在开心摸鱼,以为任务都执行很正常。虽然线程池提供了很多用于异常处理的方法,但最稳妥和简单的方案还是捕获所有异常并具体处理:
try { //业务逻辑 } catch (RuntimeException x) { //按需处理 } catch (Throwable x) { //按需处理 }
线程管理
还好有谷歌,一般我们直接利用guava的ThreadFactoryBuilder实现线程池线程的自定义命名即可。
拒绝策略
线程池默认拒绝策略抛RejectedExecutionException运行时异常,IDEA不会强制捕获,所以也易忽略它。采用何种策略,具体看任务重要性:
- 不重要任务,可选直接丢弃
- 重要任务,可降级,如将任务信息插入DB或MQ,启用一个专门用作补偿的线程池去补偿处理。降级,即在服务无法正常提供功能的情况下,采取的补救措施
- 当线程数>核心线程数,线程等待keepAliveTime后还是无任务需要处理,收缩线程到核心线程数 了解这策略,有助根据实际容量规划需求,为线程池设置合适的初始化参数。也可通过一些手段来改变这些默认工作行为,如:
声明线程池后立即调用prestartAllCoreThreads,启动所有核心线程
/** * 启动所有核心线程,使它们处于空闲等待任务的状态。 * 这个方法会覆盖默认策略(默认情况下,核心线程只有在有新任务执行时才会启动)。 * * @return 已启动的核心线程数量 */ public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
传true给allowCoreThreadTimeOut,让线程池在空闲时同样回收核心线程
/** * 如果线程池允许核心线程在没有任务到达、空闲时间超过 keepAliveTime 后 * 超时终止,则返回 true。 * 当设置为 true 时,非核心线程的“存活时间”策略同样适用于核心线程; * 当为 false(默认值)时,核心线程不会因为长时间没有任务而被终止。 * * @return {@code true} 表示允许核心线程超时终止, * {@code false} 表示不允许 * * @since 1.6 */ public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; }
弹性伸缩的实现
线程池是先用Q存放来不及处理的任务,满后再扩容线程池。当Q设置很大时(那个 工具类),最大线程数这个参数就没啥意义了,因为队列很难满或到满时可能已OOM,更没机会去扩容线程池了。 是否能让线程池优先开启更多线程,而把Q当成后续方案?比如我们的任务执行很慢,需要10s,若线程池可优先扩容到5个最大线程,那么这些任务最终都可以完成,而不会因为线程池扩容过晚导致慢任务来不及处理。
难题在于:
- 线程池在工作队列满时,会扩容线程池 重写队列的offer,人为制造该队列满的条件
- 改变了队列机制,达到最大线程后势必要触发拒绝策略 实现一个自定义拒绝策略,这时再把任务真正插入队列
Tomcat就实现了类似的“弹性”线程池。
务必确认清楚线程池本身是不是复用的。
某服务偶尔报警线程数过多,但过一会儿又会降下来,但应用的请求量却变化不大。
可以在线程数较高时抓取线程栈,发现内存中有上千个线程池,这肯定不正常!
但代码里也没看到声明了线程池,最后发现原来是业务代码调用了一个类库:
ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool(); IntStream.rangeClosed(1, 10).forEach(i -> { threadPool.execute(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {} }); });
该类库竟然每次都创建一个新的线程池!
class ThreadPoolHelper { public static ThreadPoolExecutor getThreadPool() { // 线程池没有复用 return (ThreadPoolExecutor) Executors.newCachedThreadPool(); } }
newCachedThreadPool会在需要时创建必要数量的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程。如果业务操作并发量较大的话,的确有可能一下子开启几千个线程。
那为何监控中看到线程数量会下降,而不OOM?
newCachedThreadPool的核心线程数是0,而keepAliveTime是60s,所以60s后所有线程都可回收。
那这如何修复呢?
使用static字段存放线程池引用即可
class ThreadPoolHelper { private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, 50, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()); public static ThreadPoolExecutor getRightThreadPool() { return threadPoolExecutor; } }
线程池的意义在于复用,就意味着程序应该始终使用一个线程池吗?
不,具体场景具体分析。
比如一个 I/O 型任务,不断向线程池提交任务:向一个文件写入大量数据。线程池的线程基本一直处于忙碌状态,队列也基本满。而且由于是CallerRunsPolicy策略,所以当线程满队列满,任务会在提交任务的线程或调用execute方法的线程执行,所以不要认为提交到线程池的任务就一定会被异步处理。
毕竟,若使用CallerRunsPolicy,就有可能异步任务变同步执行。使用CallerRunsPolicy,当线程池饱和时,计算任务会在执行Web请求的Tomcat线程执行,这时就会进一步影响到其他同步处理的线程,甚至造成整个应用程序崩溃。
如何修正?
使用单独的线程池处理这种“I/O型任务”,将线程数设置多一些!
所以千万不要盲目复用别人写的线程池!因为它不一定适合你的任务!
Java 8的parallel stream
可方便并行处理集合中的元素,共享同一ForkJoinPool,默认并行度:``CPU核数-1`。对于CPU绑定的任务,使用这样的配置较合适,但若集合操作涉及同步I/O操作(如数据库操作、外部服务调用),建议自定义一个ForkJoinPool(或普通线程池)。
提交到相同线程池中的任务,一定要是相互独立的,最好不要有依赖关系!