由于不知道Java线程池的bug,某程序员叕被祭天

简介: 本文深入解析线程池原理与最佳实践,涵盖ThreadPoolExecutor核心机制、Executors潜在风险(OOM)、拒绝策略、异常处理及弹性伸缩实现。强调手动创建线程池的重要性,需设置有界队列、可控线程数,并命名线程以利排查问题。警惕CachedThreadPool无限创建线程,避免因复用不当导致性能隐患。

本文已收录在Github关注我,紧跟本系列专栏文章,咱们下篇再续!

  • 🚀 魔都架构师 | 全网30W技术追随者
  • 🔧 大厂分布式系统/数据中台实战专家
  • 🏆 主导交易系统百万级流量调优 & 车联网平台架构
  • 🧠 AIGC应用开发先行者 | 区块链落地实践者
  • 🌍 以技术驱动创新,我们的征途是改变世界!
  • 👉 实战干货:编程严选网

说说你对线程池的理解?

首先明确,池化的意义在于缓存,创建性能开销较大的对象,比如线程池、连接池、内存池。预先在池里创建一些对象,使用时直接取,用完就归还复用,使用策略调整池中缓存对象的数量。

Java创建对象,仅是在JVM堆分块内存,但创建一个线程,却需调用os内核API,然后os要为线程分配一系列资源,成本很高,所以线程是一个重量级对象,应避免频繁创建或销毁。 既然这么麻烦,就要避免呀,所以要使用线程池!

一般池化资源,当你需要资源时,就调用申请线程方法申请资源,用完后调用释放线程方法释放资源。但JDK的线程池根本没有申请线程和释放线程的方法。

那到底该如何理解它的设计思想呢? 其实线程池的设计,采用的是生产者-消费者模式

  • 线程池的使用方是生产者
  • 线程池本身是消费者

以下简化代码即可显示线程池的基本原理:

package com.javaedge.concurrency.example.threadpool;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * 简化线程池
 */
public class MyThreadPool {
    /**
     * 利用阻塞队列实现生产者-消费者模式
     */
    BlockingQueue<Runnable> workQueue;
    /**
     * 保存内部工作线程
     */
    List<WorkerThread> threads = Lists.newArrayList();
    MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        // 创建工作线程
        for (int idx = 0; idx < poolSize; idx++) {
            WorkerThread work = new WorkerThread();
            work.start();
            threads.add(work);
        }
    }
    /**
     * 提交任务
     */
    void execute(Runnable command) throws InterruptedException {
        // 将任务加入到workQueue
        workQueue.put(command);
    }
    /**
     * 工作线程:负责消费任务,并执行任务
     */
    class WorkerThread extends Thread {
        @SneakyThrows
        @Override
        public void run() {
            // 消费workQueue中的任务并执行
            while (true) {
                Runnable task = workQueue.take();
                task.run();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        // 创建有界阻塞队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
        // 创建线程池
        MyThreadPool pool = new MyThreadPool(10, workQueue);
        // 提交任务
        pool.execute(() -> System.out.println("hello"));
    }
}

JDK线程池最核心的就是ThreadPoolExecutor,看名字,它强调的是Executor,并非一般的池化资源。

为什么都说要手动声明线程池?

虽然Executors工具类提供的方法可快速创建线程池如newFixedThreadPool,但阿里有话说:

弊端真严重吗,newFixedThreadPool=OOM?写段测试代码:

ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printPoolStatus(pool);
for (int i = 0; i < LOOP; i++) {
    pool.execute(() -> {
        String payload = IntStream.rangeClosed(1, 1000000)
                .mapToObj(__ -> "a")
                .collect(Collectors.joining("")) + UUID.randomUUID().toString();
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.getMessage();
        }
        log.info(payload);
    });
}

执行不久,出现OOM

Exception in thread "http-nio-30666-ClientPoller" 
 java.lang.OutOfMemoryError: GC overhead limit exceeded

newFixedThreadPool线程池的工作队列直接new了一个LinkedBlockingQueue:

public class Executors {
  
  public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
  }

但其默认构造器是一个Integer.MAX_VALUE长度的队列,所以很快Q满:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

虽然用其可固定工作线程数量,但任务队列几乎无界。若任务较多且执行较慢,队列就会快速积压,内存不够,易OOM。

newCachedThreadPool也等于OOM?

[11:30:30.487] [http-nio-30666-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause
java.lang.OutOfMemoryError: unable to create new native thread

可见OOM是因为无法创建线程,newCachedThreadPool这种线程池的最大线程数是Integer.MAX_VALUE,可认为无上限,而其工作队列SynchronousQueue是个无存储空间的阻塞队列。

public class Executors {
    /**
     * 创建一个可根据需要创建新线程的线程池,但会在可能的情况下复用之前已创建的线程。
     * 
     * 这种类型的线程池通常能提升执行大量短生命周期异步任务的程序性能。
     * 
     * 当调用execute方法提交任务时,如果线程池中有空闲线程,
     * 则会复用这些线程;如果没有可用线程,则会创建一个新线程并加入线程池。
     * 
     * 超过 60 秒未被使用的线程将被终止并从缓存中移除。
     * 因此,如果该线程池长时间处于空闲状态,将不会占用任何系统资源。
     * 
     * 注意:可通过ThreadPoolExecutor的构造方法
     * 创建具有类似特性但细节不同(例如超时参数不同)的线程池。
     *
     * @return 新创建的线程池实例
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}

只要有请求到来,就必须找到一条工作线程处理,若当前无空闲线程就再创建一个新的。由于我们的任务需很长时间才能执行完成,大量任务进来后会创建大量线程。而线程是需要分配一定内存空间作为线程栈的,如1MB,因此无限创建线程必OOM。

所以用线程池,别抱任何侥幸,以为只是处理轻量任务,不会造成队列积压或创建大量线程!如某业务一旦接受到请求,就会调用外部服务,该外部服务接口正常100ms内会响应,现在TPS过百,CachedThreadPool能稳定在占用10个左右线程情况下满足需求。

可天有不测风云,该外部服务不可用了!而代码里调用该服务设置的超时又特别长, 比如1min,1min可能已经进成千上万请求,产生几千个任务,需几千个线程,没多久就因为无法再创建新线程,OOM!

所以阿里才不建议使用Executors:

  • 要结合实际并发情况,评估线程池核心参数,确保其工作行为符合预期,关键的也就是设置有界工作队列和数量可控的线程数
  • 永远要为自定义的线程池设置有意义名称,以便排查问题 因为当出现线程数量暴增、死锁、CPU负载高、线程执行异常等事故时,往往都需抓取线程栈。有意义的线程名称,就很重要。示例如下:
ExecutorService executor = new ThreadPoolExecutor(
    50, 500, 60L, TimeUnit.SECONDS,
    // 创建有界队列
    new LinkedBlockingQueue<>(capacity: 2000),
    // 根据业务需求实现ThreadFactory
    r -> new Thread(r, name: "echo-" + r.hashCode()),
    // 建议根据业务需求实现RejectedExecutionHandler
    new ThreadPoolExecutor.CallerRunsPolicy());

注意异常处理

通过ThreadPoolExecutor#execute()提交任务时,若任务在执行的过程中出现运行时异常,会导致 执行任务的线程 终止。 但要命的是,有时任务虽然异常了,但你却收不到任何通知,你还在开心摸鱼,以为任务都执行很正常。虽然线程池提供了很多用于异常处理的方法,但最稳妥和简单的方案还是捕获所有异常并具体处理:

try {
    //业务逻辑
} catch (RuntimeException x) {
    //按需处理
} catch (Throwable x) {
    //按需处理
}

线程管理

还好有谷歌,一般我们直接利用guava的ThreadFactoryBuilder实现线程池线程的自定义命名即可。

拒绝策略

线程池默认拒绝策略抛RejectedExecutionException运行时异常,IDEA不会强制捕获,所以也易忽略它。采用何种策略,具体看任务重要性:

  • 不重要任务,可选直接丢弃
  • 重要任务,可降级,如将任务信息插入DB或MQ,启用一个专门用作补偿的线程池去补偿处理。降级,即在服务无法正常提供功能的情况下,采取的补救措施
  • 当线程数>核心线程数,线程等待keepAliveTime后还是无任务需要处理,收缩线程到核心线程数 了解这策略,有助根据实际容量规划需求,为线程池设置合适的初始化参数。也可通过一些手段来改变这些默认工作行为,如:

声明线程池后立即调用prestartAllCoreThreads,启动所有核心线程

/**
 * 启动所有核心线程,使它们处于空闲等待任务的状态。
 * 这个方法会覆盖默认策略(默认情况下,核心线程只有在有新任务执行时才会启动)。
 *
 * @return 已启动的核心线程数量
 */
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

传true给allowCoreThreadTimeOut,让线程池在空闲时同样回收核心线程

/**
 * 如果线程池允许核心线程在没有任务到达、空闲时间超过 keepAliveTime 后
 * 超时终止,则返回 true。  
 * 当设置为 true 时,非核心线程的“存活时间”策略同样适用于核心线程;
 * 当为 false(默认值)时,核心线程不会因为长时间没有任务而被终止。
 *
 * @return {@code true} 表示允许核心线程超时终止,
 *         {@code false} 表示不允许
 *
 * @since 1.6
 */
public boolean allowsCoreThreadTimeOut() {
    return allowCoreThreadTimeOut;
}

弹性伸缩的实现

线程池是先用Q存放来不及处理的任务,满后再扩容线程池。当Q设置很大时(那个 工具类),最大线程数这个参数就没啥意义了,因为队列很难满或到满时可能已OOM,更没机会去扩容线程池了。 是否能让线程池优先开启更多线程,而把Q当成后续方案?比如我们的任务执行很慢,需要10s,若线程池可优先扩容到5个最大线程,那么这些任务最终都可以完成,而不会因为线程池扩容过晚导致慢任务来不及处理。

难题在于:

  • 线程池在工作队列满时,会扩容线程池 重写队列的offer,人为制造该队列满的条件
  • 改变了队列机制,达到最大线程后势必要触发拒绝策略 实现一个自定义拒绝策略,这时再把任务真正插入队列

Tomcat就实现了类似的“弹性”线程池。

务必确认清楚线程池本身是不是复用的

某服务偶尔报警线程数过多,但过一会儿又会降下来,但应用的请求量却变化不大。

可以在线程数较高时抓取线程栈,发现内存中有上千个线程池,这肯定不正常!

但代码里也没看到声明了线程池,最后发现原来是业务代码调用了一个类库:

ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
IntStream.rangeClosed(1, 10).forEach(i -> {
    threadPool.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {}
    });
});

该类库竟然每次都创建一个新的线程池!

class ThreadPoolHelper {
    public static ThreadPoolExecutor getThreadPool() {
        // 线程池没有复用
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();
    }
}

newCachedThreadPool会在需要时创建必要数量的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程。如果业务操作并发量较大的话,的确有可能一下子开启几千个线程。

那为何监控中看到线程数量会下降,而不OOM?

newCachedThreadPool的核心线程数是0,而keepAliveTime是60s,所以60s后所有线程都可回收。

那这如何修复呢?

使用static字段存放线程池引用即可

class ThreadPoolHelper {
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        10,
        50,
        2, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(1000),
        new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());
    public static ThreadPoolExecutor getRightThreadPool() {
        return threadPoolExecutor;
    }
}

线程池的意义在于复用,就意味着程序应该始终使用一个线程池吗?

不,具体场景具体分析。

比如一个 I/O 型任务,不断向线程池提交任务:向一个文件写入大量数据。线程池的线程基本一直处于忙碌状态,队列也基本满。而且由于是CallerRunsPolicy策略,所以当线程满队列满,任务会在提交任务的线程或调用execute方法的线程执行,所以不要认为提交到线程池的任务就一定会被异步处理

毕竟,若使用CallerRunsPolicy,就有可能异步任务变同步执行。使用CallerRunsPolicy,当线程池饱和时,计算任务会在执行Web请求的Tomcat线程执行,这时就会进一步影响到其他同步处理的线程,甚至造成整个应用程序崩溃。

如何修正?

使用单独的线程池处理这种“I/O型任务”,将线程数设置多一些!

所以千万不要盲目复用别人写的线程池!因为它不一定适合你的任务!

Java 8的parallel stream

可方便并行处理集合中的元素,共享同一ForkJoinPool,默认并行度:``CPU核数-1`。对于CPU绑定的任务,使用这样的配置较合适,但若集合操作涉及同步I/O操作(如数据库操作、外部服务调用),建议自定义一个ForkJoinPool(或普通线程池)。

提交到相同线程池中的任务,一定要是相互独立的,最好不要有依赖关系!

目录
相关文章
|
10月前
|
人工智能 Kubernetes Java
回归开源,两位 Java 和 Go 程序员分享的开源贡献指引
Higress是一个基于Istio和Envoy的云原生API网关,支持AI功能扩展。它通过Go/Rust/JS编写的Wasm插件提供可扩展架构,并包含Node和Java的console模块。Higress起源于阿里巴巴,解决了Tengine配置重载及gRPC/Dubbo负载均衡问题,现已成为阿里云API网关的基础。本文介绍Higress的基本架构、功能(如AI网关、API管理、Ingress流量网关等)、部署方式以及如何参与开源贡献。此外,还提供了有效的开源贡献指南和社区交流信息。
1149 33
|
10月前
|
Java 程序员 应用服务中间件
【高薪程序员必看】万字长文拆解Java并发编程!(2 2-2)
📌 核心痛点暴击:1️⃣ 面了8家都被问synchronized锁升级?一张图看懂偏向锁→重量级锁全过程!2️⃣ 线程池参数不会配?高并发场景下这些参数调优救了项目组命!3️⃣ volatile双重检测单例模式到底安不安全?99%人踩过的内存可见性大坑!💡 独家亮点抢先看:✅ 图解JVM内存模型(JMM)三大特性,看完再也不怕指令重排序✅ 手撕ReentrantLock源码,AQS队列同步器实现原理大揭秘✅ 全网最细线程状态转换图(附6种状态转换触发条件表)
184 0
|
10月前
|
存储 缓存 Java
【高薪程序员必看】万字长文拆解Java并发编程!(5):深入理解JMM:Java内存模型的三大特性与volatile底层原理
JMM,Java Memory Model,Java内存模型,定义了主内存,工作内存,确保Java在不同平台上的正确运行主内存Main Memory:所有线程共享的内存区域,所有的变量都存储在主存中工作内存Working Memory:每个线程拥有自己的工作内存,用于保存变量的副本.线程执行过程中先将主内存中的变量读到工作内存中,对变量进行操作之后再将变量写入主内存,jvm概念说明主内存所有线程共享的内存区域,存储原始变量(堆内存中的对象实例和静态变量)工作内存。
309 0
|
10月前
|
设计模式 缓存 安全
【高薪程序员必看】万字长文拆解Java并发编程!(8):设计模式-享元模式设计指南
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的经典对象复用设计模式-享元模式,废话不多说让我们直接开始。
215 0
|
10月前
|
存储 安全 Java
【高薪程序员必看】万字长文拆解Java并发编程!(7):不可变类设计指南
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中Java不可变类设计指南,废话不多说让我们直接开始。
194 0
|
10月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
389 0
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
607 60
【Java并发】【线程池】带你从0-1入门线程池
|
10月前
|
存储 监控 算法
Java程序员必学:JVM架构完全解读
Java 虚拟机(JVM)是 Java 编程的核心,深入理解其架构对开发者意义重大。本文详细解读 JVM 架构,涵盖类加载器子系统、运行时数据区等核心组件,剖析类加载机制,包括加载阶段、双亲委派模型等内容。阐述内存管理原理,介绍垃圾回收算法与常见回收器,并结合案例讲解调优策略。还分享 JVM 性能瓶颈识别与调优方法,分析 Java 语言特性对性能的影响,给出数据结构选择、I/O 操作及并发同步处理的优化技巧,同时探讨 JVM 安全模型与错误处理机制,助力开发者提升编程能力与程序性能。
Java程序员必学:JVM架构完全解读
|
11月前
|
人工智能 Java 程序员
Java程序员在AI时代必会的技术:Spring AI
在AI时代,Java程序员需掌握Spring AI技术以提升竞争力。Spring AI是Spring框架在AI领域的延伸,支持自然语言处理、机器学习集成与自动化决策等场景。它简化开发流程,无缝集成Spring生态,并提供对多种AI服务(如OpenAI、阿里云通义千问)的支持。本文介绍Spring AI核心概念、应用场景及开发步骤,含代码示例,助你快速入门并构建智能化应用,把握AI时代的机遇。
2264 61
|
人工智能 Java 程序员
【AI程序员】通义灵码 AI 程序员全面上线JAVA使用体验
通过 AI 程序编写一个JAVA后台项目登陆页面
884 42