线程池:业务代码常见的问题

简介: 线程池:业务代码常见的问题

线程池:业务代码常见的问题

在这里插入图片描述

在程序中,我们会使用各种池优化缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定策略调整池中缓存的对象数量,实现动态伸缩。


由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般优先考虑使用线程池来处理,而不是直接创建线程



1. 线程池的声明需要手动进行

Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。《阿里 巴巴 Java
开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor
来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典 型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

阿里巴巴文档:

在这里插入图片描述

测试 OOM问题 :

  • 来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提

交任务,每个任务都会创建一个比较大的字符串然后休眠一小时

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("开始执行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小时开始");
                try {
                    TimeUnit.HOURS.sleep(1);
                }catch (Exception e){
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1,TimeUnit.HOURS);
    }

结果:java.lang.OutOfMemoryError 错误

在这里插入图片描述

首先我们看下 newFixedThreadPool 方法的源码,发现,线程池的工作队列直接 new 了一个 LinkedBlockingQueue,

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

点进去查看 LinkedBlockingQueue构造方法 是一个 Integer.MAX_VALUE长度的队列,可以认为是无界的

 /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
虽然 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界但。如果任务较多并且执行较慢但话,队列可能会快速积压,撑爆内存导致OOM


测试newCachedThreadPool

如果我们把 newFixedThreadPool 改成 newCachedThreadPool方法来获取线程池。程序运行不久后,同样会看到 OOM 异常

java.lang.OutOfMemoryError: unable to create new native thread

源码:

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

这种线程池的最大线程数是 Integer.MAX_VALUE ,认为是没有上限的,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。由于我们的任务需要一小时才能完成,大量的任务进来后会创建大量的线程,我们知道线程是分配一定的内存空间做为线程栈,比如 1MB,因此无限创建线程必然会导致OOM


我们不建议使用 Executors 提供的两种快捷的线程池,原因如下:
  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合要求,一般都需要设置有界的工作队列和 可控的线程数。
  • 任何时候,都于根伟自定义线程池指定有意思的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU 、线程执行出现异常等问题时,我们往往会抓取线程栈,此时,有意义的线程名称,就可以方便我们定位问题。


总结 线程池工作行为:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

2.确认线程池是否在复用

  • 在生产环境中,监控一直报警当前使用线程数太多,一会又将下来,但是当前用户访问量也不是很大

通过代码排查 发现项目中使用了 Executors.newCachedThreadPool(); 创建线程池使用,我们知道newCachedThreadPool 会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程,如果业务操作量较大,的确有可能一下子开启几千个线程

源码发现

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 它的核心线程数是0,而最大线程数 Integer的最大值,一般来说机器都没那么大内存给它不断使用,而 keepAliveTime 是60秒,也就是在 60秒之后所有的线程都是可以回收的,采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

所以我们在使用线程池要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:
: 1. 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队
列。
: 2. 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理
由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过
多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做
缓冲。

相关文章
|
4月前
多线程案例-定时器(附完整代码)
多线程案例-定时器(附完整代码)
279 0
|
4月前
|
数据采集 Python
【Python自动化】多线程BFS站点结构爬虫代码,支持中断恢复,带注释
【Python自动化】多线程BFS站点结构爬虫代码,支持中断恢复,带注释
55 0
|
4月前
|
Java
|
2月前
|
安全 Python
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
44 3
|
4月前
|
设计模式 监控 Java
Java多线程基础-11:工厂模式及代码案例之线程池(一)
本文介绍了Java并发框架中的线程池工具,特别是`java.util.concurrent`包中的`Executors`和`ThreadPoolExecutor`类。线程池通过预先创建并管理一组线程,可以提高多线程任务的效率和响应速度,减少线程创建和销毁的开销。
96 2
|
4月前
|
安全 Java
Java多线程基础-10:代码案例之定时器(一)
`Timer` 是 Java 中的一个定时器类,用于在指定延迟后执行指定的任务。它常用于实现定时任务,例如在网络通信中设置超时或定期清理数据。`Timer` 的核心方法是 `schedule()`,它可以安排任务在延迟一段时间后执行。`
91 1
|
18天前
|
Java Windows
【Azure Developer】Windows中通过pslist命令查看到Java进程和线程信息,但为什么和代码中打印出来的进程号不一致呢?
【Azure Developer】Windows中通过pslist命令查看到Java进程和线程信息,但为什么和代码中打印出来的进程号不一致呢?
|
17天前
|
Java 开发者
解锁Java并发编程的秘密武器!揭秘AQS,让你的代码从此告别‘锁’事烦恼,多线程同步不再是梦!
【8月更文挑战第25天】AbstractQueuedSynchronizer(AQS)是Java并发包中的核心组件,作为多种同步工具类(如ReentrantLock和CountDownLatch等)的基础。AQS通过维护一个表示同步状态的`state`变量和一个FIFO线程等待队列,提供了一种高效灵活的同步机制。它支持独占式和共享式两种资源访问模式。内部使用CLH锁队列管理等待线程,当线程尝试获取已持有的锁时,会被放入队列并阻塞,直至锁被释放。AQS的巧妙设计极大地丰富了Java并发编程的能力。
26 0
|
3月前
|
Java
【代码诗人】Java线程的生与死:一首关于生命周期的赞歌!
【6月更文挑战第19天】Java线程生命周期,如诗般描绘了从新建到死亡的旅程:创建后待命,`start()`使其就绪,获得CPU则运行,等待资源则阻塞,任务完或中断即死亡。理解生命周期,善用锁、线程池,优雅处理异常,确保程序高效稳定。线程管理,既是艺术,也是技术。
24 3
|
3月前
|
API
linux---线程互斥锁总结及代码实现
linux---线程互斥锁总结及代码实现