速读Java线程池

简介: 线程池是开发中绕不开的一个知识点 。 对于移动开发而言,网络框架、图片加载、AsyncTask、RxJava, 都和线程池有关。 正因为线程池应用如此广泛,所以也成了面试的高频考点。

一、前言

线程池是开发中绕不开的一个知识点 。
对于移动开发而言,网络框架、图片加载、AsyncTask、RxJava, 都和线程池有关。
正因为线程池应用如此广泛,所以也成了面试的高频考点。

我们今天就来讲讲线程池的基本原理和周边知识。
先从线程的生命周期开始。

二、线程生命周期

线程是程序执行流的最小单元。
Java线程可分为五个阶段:

  • 新建(New): 创建Thread对象,并且未调用start();
  • 就绪(Runnable): 调用start()之后, 等待操作系统调度;
  • 运行(Running): 获取CPU时间分片,执行 run()方法中的代码;
  • 阻塞(Blocked): 线程让出CPU,进入等待(就绪);
  • 终止(Terminated): 自然退出或者被终止。

线程的创建和销毁代价较高,当有大量的任务时,可复用线程,以提高执行任务的时间占比。
如上图,不断地 Runnable->Runing->Blocked->Runnable, 就可避免过多的线程创建和销毁。
此外,线程的上下文切换也是开销比较大的,若要使用线程池,需注意设置合理的参数,控制线程并发。

三、ThreadPoolExecutor

JDK提供了一个很好用的线程池的封装:ThreadPoolExecutor

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize:核心线程大小
maximumPoolSize:线程池最大容量(需大于等于corePoolSize,否则会抛异常)
keepAliveTime:线程执行任务结束之后的存活时间
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂
handler:拒绝策略

线程池中有两个任务容器:

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;

前者用于存储Worker,后者用于缓冲任务(Runnable)。
下面是execute方法的简要代码:

    public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
        }
        // 若workQueue已满,offer会返回false
        if (isRunning(c) && workQueue.offer(command)) {
            // ...
        } else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        int wc = workerCountOf(c);
        if (wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        workers.add(w);
        t.start();
    }

一个任务到来,假设此时容器workers中Worker数的数量为c,则

  • 1、当c < corePoolSize时,创建Worker来执行这个任务,并放入workers
  • c >= corePoolSize时,

    • 2、若workQueue未满,则将任务放入workQueue
    • workQueue已满,

      • 3、若c < maximumPoolSize,创建Worker来执行这个任务,并放入workers
      • 4、若c >= maximumPoolSize, 执行拒绝策略。

很多人在讲线程池的时候,干脆把workers说成“线程池”,将Worker和线程混为一谈;
不过这也无妨,能帮助理解就好,就像看到一杯水,说“这是水”一样,很少人会说这是“杯子装着水”。

Worker和线程,好比汽车和引擎:汽车装着引擎,汽车行驶,其实是引擎在做功。
Worker本身实现了Runnable,然后有一个Thread和Runnable的成员;
构造函数中,将自身(this)委托给自己的成员thread
thread.start(), Worker的run()函数被回调,从而开启 “执行任务-获取任务”的轮回。

    private final class Worker implements Runnable{
        final Thread thread;
        Runnable firstTask;
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
    }

    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            task.run();
        }
    }

    private Runnable getTask() {
        for (;;) {
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }

当线程执行完任务(task.run()结束),会尝试去workQueue取下一个任务,
如果workQueue已经清空,则线程进入阻塞态:workQueue是阻塞队列,如果取不到元素会block当前线程。
此时,allowCoreThreadTimeOuttrue, 或者 n > corePoolSize,workQueue等待keepAliveTime的时间,
如果时间到了还没有任务进来, 则退出循环, 线程销毁;
否则,一直等待,直到新的任务到来(或者线程池关闭)。
这就是线程池可以保留corePoolSize个线程存活的原理。

从线程的角度,要么执行任务,要么阻塞等待,或者销毁;
从任务的角度,要么马上被执行,要么进入队列等待被执行,或者被拒绝执行。
上图第2步,任务进入workQueue, 如果队列为空且有空闲的Worker的话,可马上得到执行。

关于workQueue,常用的有两个队列:

  • LinkedBlockingQueue(capacity):
    传入capacity(大于0), 则LinkedBlockingQueue的容量为capacity;

如果不传,默认为Integer.MAX_VALUE,相当于无限容量(不考虑内存因素),多少元素都装不满。

  • SynchronousQueue
    除非另一个线程试图获取元素,否则不能添加元素。

四、 ExecutorService

为了方便使用,JDK还封装了一些常用的ExecutorService:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}
类型 最大并发 适用场景
newFixedThreadPool nThreads 计算密集型任务
newSingleThreadExecutor 1 串行执行的任务
newCachedThreadPool Integer.MAX_VALUE IO密集型任务
newScheduledThreadPool Integer.MAX_VALUE 定时任务,周期任务

newSingleThreadExecutor 其实是 newFixedThreadPool的特例 (nThreads=1),
写日志等任务,比较适合串行执行,一者不会占用太多资源,二者为保证日志有序与完整,同一时间一个线程写入即可。

众多方法中,newCachedThreadPool() 是比较特别的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 为 SynchronousQueue。

结合上一节的分析:
当一个任务提交过来,由于corePoolSize = 0,任务会尝试放入workQueue;
如果没有线程在尝试从workQueue获取任务,offer()会返回false,然后会创建线程执行任务;
如果有空闲线程在等待任务,任务可以放进workQueue,但是放进去后马上就被等待任务的线程取走执行了。
总的来说,就是有空闲线程则交给空闲线程执行,没有则创建线程执行;
SynchronousQueue类型workQueue并不保存任务,只是一个传递者。
所以,最终效果为:所有任务立即调度,无容量限制,无并发限制。

这样的特点比较适合网络请求任务。
OkHttp的异步请求所用线程池与此类似(除了ThreadFactory ,其他参数一模一样)。

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

五、 任务并发的估算

一台设备上,给定一批任务,要想最快时间完成所有任务,并发量应该如何控制?
并发量太小,CPU利用率不高;
并发量太大,CPU 满负荷,但是花在线程切换的时间增加,用于执行任务的时间反而减少。

一些文章提到如下估算公式:

M:并发数;
C:任务占用CPU的时间;
I:等待IO完成的时间(为简化讨论,且只考虑IO);
N:CPU核心数。

代入特定参数验证这条公式:
1、比方说 I 接近于0,则M≈N,一个线程对应一个CPU,刚好满负荷且较少线程切换;
2、假如 I=C,则M = 2N,两个线程对应一个CPU,每个线程一半时间在等待IO,一半时间在计算,也是刚好。

遗憾的是,对于APP而言这条公式并不适用:

  • 任务占用CPU时间和IO时间无法估算
    APP上的异步任务通常是碎片化的,而不同的任务性质不一样,有的计算耗时多,有的IO耗时多;

然后同样是IO任务,比方说网络请求,IO时间也是不可估计的(受服务器和网速影响)。

  • 可用CPU核心可能会变化
    有的设备可能会考虑省电或者热量控制而关闭一些核心;

大家经常吐槽的“一核有难,九核围观”映射的就是这种现象。

虽然该公式不能直接套用来求解最大并发,但仍有一些指导意义:
**IO等待时间较多,则需要高的并发,来达到高的吞吐率;
CPU计算部分较多,则需要降低并发,来提高CPU的利用率。**

换言之,就是:
计算密集型任务时控制并发小一点;
IO密集型任务时控制并发大一点。

问题来了,小一点是多小,大一点又是多大呢?
说实话这个只能凭经验了,跟“多吃水果”,“加盐少许”一样,看实际情况而定。

比如RxJava就提供了Schedulers.computation()Schedulers.io()
前者默认情况下为最大并发为CPU核心数,后者最大并发为Integer.MAX_VALUE(相当于不限制并发)。
可能是作者也不知道多少才合适,所以干脆就不限制了。
这样其实很危险的,JVM对进程有最大线程数限制,超过则会抛OutOfMemoryError

六、总结

回顾文章的内容,大概有这些点:

  • 介绍了线程的生命周期;
  • 从线程池的参数入手,分析这些参数是如何影响线程池的运作;
  • 列举常用的ExecutorService,介绍其各自特点和适用场景;
  • 对并发估算的一些理解。

文章没有对Java线程池做太过深入的探讨,而是从使用的角度讲述基本原理和周边知识;
第二节有结合关键代码作简要分析,也是点到为止,目的在于加深对线程池相关参数的理解,
以便在平时使用线程池的时候合理斟酌,在阅读涉及线程池的开源代码时也能“知其所以然”。

相关文章
|
1月前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
4月前
|
Java 调度 数据库
Java并发编程:深入理解线程池
在Java并发编程的海洋中,线程池是一艘强大的船,它不仅提高了性能,还简化了代码结构。本文将带你潜入线程池的深海,探索其核心组件、工作原理及如何高效利用线程池来优化你的并发应用。
|
4月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
129 1
|
4月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
4月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
19天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
15天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
112 38
|
22天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
91 4
下一篇
DataWorks