Java多线程-死磕ThreadPoolExecutor线程池

简介: Java线程池ThreadPoolExecutor基本实现原理

title: 死磕ThreadPoolExecutor线程池
date: 2020-05-14 15:53:00
categories: Java,ThreadPoolExecutor

description: Java多线程

Java-big

thread-pool-1

1. 线程池的优势

  • 节省资源开销:重复利用线程池中的线程,不需要每次都创建
  • 提升对线程的管理能力:统一对线程分配和监控,避免无限创建,造成资源内存溢出和CPU耗尽
  • 提高响应,降低系统开销:减少了创建线程的时间消耗,提高应对任务的响应

线程空间大小

线程空间大小和具体JDK版本有很大关系,JDK8将近1.9M、JDK11差不多1.5M多。具体大小的查看可以执行命令java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -version

C:\Users\WONGS> java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -version
java version "11.0.2" 2019-01-15 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.2+9-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.2+9-LTS, mixed mode)

Native Memory Tracking:

Total: reserved=7849030KB, committed=465994KB
-                 Java Heap (reserved=6248448KB, committed=391168KB)
                            (mmap: reserved=6248448KB, committed=391168KB)

-                     Class (reserved=1056866KB, committed=4578KB)
                            (classes #472)
                            (  instance classes #407, array classes #65)
                            (malloc=98KB #502)
                            (mmap: reserved=1056768KB, committed=4480KB)
                            (  Metadata:   )
                            (    reserved=8192KB, committed=4096KB)
                            (    used=3120KB)
                            (    free=976KB)
                            (    waste=0KB =0.00%)
                            (  Class space:)
                            (    reserved=1048576KB, committed=384KB)
                            (    used=297KB)
                            (    free=87KB)
                            (    waste=0KB =0.00%)

-                    Thread (reserved=16455KB, committed=591KB)
                            (thread #16)
                            (stack: reserved=16384KB, committed=520KB)
                            (malloc=52KB #89)
                            (arena=19KB #30)
......

2. 几种常见线程池

  • newCachedThreadPool:数量无上限,该线程池会根据需要创建,但是优先使用之前构造的线程。这些池通常将提高执行许多短期异步任务的程序的性能,如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。当60S内未使用的线程将被终止并从缓存中删除。 因此,保持空闲时间足够长的池不会消耗任何资源。

构造函数

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  • newFixedThreadPool:数量固定大小,该线程池重用在共享的无边界队列上运行的固定数量的线程。在任何时候,最多nThreads(构造函数的参数,核心线程数与最大线程数相等)个线程都是活动的处理任务。 如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。 如果在关闭之前执行过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。 池中的线程将一直存在,直到明确将其关闭。

构造函数

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor:单线程,保证提交线程执行任务的FIFO, (但是请注意,如果该单个线程由于在关闭前执行期间由于执行失败而终止,则在需要执行新任务时将使用新线程代替。),在任何给定时间活动的任务不超过一个。与newFixedThreadPool不同,保证返回的执行程序不可重新配置为使用其他线程。

构造函数

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • newScheduledThreadPool:该线程池可以安排命令在给定的延迟后运行或定期执行。

构造函数

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledThreadPoolExecutor类结构图

通过类图,我们分析,其实 ScheduledThreadPoolExecutorThreadPoolExecutor 子类。

  • newWorkStealingPool:使用所有可用处理器作为目标并行度级别,创建工作窃取线程池,在实际中很少使用,不细讲。

综上所述,我们可以看到这些线程池底层实现都依靠 ThreadPoolExecutor 类的构造器,它是构造线程池的核心实现。但是现实在开发过程中避免利用 Executors 去创建线程池,这容易让人疑惑,JDK命名自带实现,为什么避免用,看完下一章节后,我们再谈这个话题。

3. 解析ThreadPoolExecutor

  • corePoolSize:初始化大小,即使没有空闲,保留在池中的线程数,除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:允许线程池同时并行的线程数量
  • keepAliveTime:当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间
  • unit:TimeUnit类型,这没什么好说
  • workQueue:在执行任务之前用于保留任务的队列,此队列将仅保存execute方法提交的Runnable任务。
  • threadFactory:执行程序创建新线程时要使用的工厂
  • handler:当线等待队列中的数量超过既定容量,所需要处理策略

构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          @NotNull TimeUnit unit,
                          @NotNull BlockingQueue<Runnable> workQueue,
                          @NotNull ThreadFactory threadFactory,
                          @NotNull RejectedExecutionHandler handler)

ThreadPoolExecutor结构类图

3.1. corePoolSize、maximumPoolSize

corePoolSize、maximumPoolSize 线程池中初始化的线程数量,初始化太多或者太少,都有可能造成资源的浪费,具体实际情况根据所需要处理的任务特征决定。

3.2. workQueue

将待处理的任务放入一个队列,这是一个阻塞队列,该队列可以是有界也可以是无界。

3.3. handler

  • AbortPolicy:拒绝执行处理程序,这是默认策略。
  • CallerRunsPolicy:线程池未关闭,被拒绝任务,它直接在调用线程中运行被丢弃的任务
  • DiscardOldestPolicy:丢弃最老,然后重试执行当前任务
  • DiscardPolicy:比较粗暴,直接丢弃。

其实上述四种策略都不够友好,在实际应用场景中,肯定要记录日志或者通过RPC框架触发通知补偿措施,否则会造成数据丢失或者处理过程不够严谨。一般情况下,我们需要自己实现 RejectedExecutionHandler 接口,在接口中记录日志或者持久化不能处理的任务信息。再通过定时任务,进行补偿重试。

3.4. 线程池执行顺序

线程池执行逻辑

  • 判断核心线程数(corePoolSize)是否已满,未满则创建核心线程,用来执行任务
  • 判断 workQueue 队列类型是否是有界队列,如果 否,则maximumPoolSize 参数的配置无效;当是有界队列,则判断有界队列是否已满
  • 判断有界队列是否已满,当队列还有空间,还要进一步判定线程池是否已满即线程池中线程数量是否已达到maximumPoolSize,未超过则直接创建非核心线程;超过则根据拒绝策略执行相关操作

下面将举几个例子。

3.4.1. 无界队列样例

定义核心线程数,corePoolSize 为 1;无界队列LinkedBlockingQueue同时为展示更好的效果,我让每个线程执行后都sleep 秒钟。

public class ThreadPoolExecutorDemo2 {

    public static ExecutorService getExecutorService(int coreSize,int maxSize,BlockingQueue queue){
        RejectedExecutionHandler policy = new ThreadPoolExecutor.AbortPolicy();
        return new ThreadPoolExecutor(coreSize, maxSize,0, TimeUnit.SECONDS, queue, policy);
    }

    public static void main(String[] args) {
//        ArrayBlockingQueue queue = new ArrayBlockingQueue(3);
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        ExecutorService es = getExecutorService(1,5,queue);

        ThreadTaskDemo t1 = new ThreadTaskDemo("t1");
        ThreadTaskDemo t2 = new ThreadTaskDemo("t2");
        ThreadTaskDemo t3 = new ThreadTaskDemo("t3");
        ThreadTaskDemo t4 = new ThreadTaskDemo("t4");
        ThreadTaskDemo t5 = new ThreadTaskDemo("t4");

        es.execute(t1);
        es.execute(t2);
        es.execute(t3);
        es.execute(t4);
        es.execute(t5);

        System.out.println("执行完毕!");
        es.shutdown();
    }
}

public class ThreadTaskDemo implements Runnable{

    @Getter
    @Setter
    private String value;

    public ThreadTaskDemo(){

    }

    public ThreadTaskDemo(String value){
        this.value=value;
    }

    @Override
    public void run() {
        System.out.println("当前时间 "+LocalDateTime.now().getSecond()+" 当前线程名: "+Thread.currentThread().getName()+" BEGIN "+value );
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

通过控制台输出,我们可以看到,任务只在一个线程中有序执行,说明 maximumPoolSize 参数配置无意义,并未有创建线程的操作。

当前时间 35 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 37 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 39 当前线程名: pool-1-thread-1 BEGIN t3
当前时间 41 当前线程名: pool-1-thread-1 BEGIN t4
当前时间 43 当前线程名: pool-1-thread-1 BEGIN t5

演示效果

3.4.2. 有界队列样例

定义核心线程数,corePoolSize 为 1;有界队列ArrayBlockingQueue 设置 3、同时为展示更好的效果,我也让每个线程执行后都sleep 秒钟。

  • 任务数 为 5,和 maximumPoolSize 设置 6 的情况下:我们看到 控制台打印显示有两个线程 pool-1-thread-1pool-1-thread-2

当前时间 30:14 当前线程名: pool-1-thread-2 BEGIN t5
当前时间 30:14 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 30:16 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 30:16 当前线程名: pool-1-thread-2 BEGIN t3
当前时间 30:18 当前线程名: pool-1-thread-2 BEGIN t4
  • 任务数 为 7,和 最大线程池容量 设置 1 的情况下:控制台打印有异常,这是因为我们任务数超出 队列容量最大线程池容量的之和,所以应用执行 RejectedExecutionHandler 策略。并且这时候应用状态时挂起,非常不友好,在下一节,写个案例自定义handler

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task xyz.wongs.interview.thread.pool.ThreadTaskDemo@4dfa3a9d rejected from java.util.concurrent.ThreadPoolExecutor@6eebc39e[Running, pool size = 1, active threads = 1, queued tasks = 3, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at xyz.wongs.interview.thread.pool.ThreadPoolExecutorDemo2.main(ThreadPoolExecutorDemo2.java:29)
当前时间 18:16 当前线程名: pool-1-thread-1 BEGIN t1
当前时间 18:18 当前线程名: pool-1-thread-1 BEGIN t2
当前时间 18:20 当前线程名: pool-1-thread-1 BEGIN t3
当前时间 18:22 当前线程名: pool-1-thread-1 BEGIN t4

有界队列执行顺序

综上所述,在 有界队列实现中我们要注意,任务数最大线程池容量队列容量三者之间的关系。

3.4.3. 自定义handler

编写一个 Java类,实现接口 RejectedExecutionHandler,重写 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法,具体如下

public class CoustomRejectedExecutionHandler implements RejectedExecutionHandler {


    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof ThreadTaskDemo) {
            ThreadTaskDemo thTk = (ThreadTaskDemo) r;
            //为了演示用,所以直接打印,勿模仿。正式场景下应该持久化或者写入日志!!!
            System.out.println("当前任务 "+ thTk.getValue()+" 执行失败!");
        }
    }
}

我们再运行下例子,我们可以发现并没抛出异常,而且控制应用也关闭。

自定义拒绝策略后验证效果

3.5. 禁用Executors创建线程池

通过上面我们简单了解线程池的构造函数参数的意义,我们线程池再线程创建时,其构造函数中指定的队列 LinkedBlockingQueue,这是一种无界的队列,最大值 Integer.MAX_VALUE 即214748364,这队列堆积数量过大,在实际生产中可能直接OOM,不信的话。好奇同学也会说不是还有 newCachedThreadPool,但是它的最大线程数量是 Integer.MAX_VALUE,道理一样,容易造成OOM。

所以很多大型公司在编码规范上都禁止利用 Executors创建线程池。

3.6. 第三方常见创建线程池的方式

3.6.1. 引入 commons-lang3 包方式【不推荐】

ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,threadFactory);
  • 依然给最大线程池赋值无上限。
  • DelayedWorkQueue 延迟阻塞队列,不推荐

3.6.2. 引入 com.google.guava 包方式【一般推荐】

ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build();

ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());

4. 常见问题

4.1. newFixedThreadPool(1) 与 newSingleThreadExecutor 区别

  • newSingleThreadExecutor采用FIFO,保证线程执行顺序,先提交的任务先执行,而newFixedThreadPool(1)不保证。
  • newSingleThreadExecutor方法中,当线程执行出现异常时,它会重新创建一个线程替换之前的线程继续执行,而newFixedThreadPool(1)不行。
目录
相关文章
|
8天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
28 9
|
8天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
9天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
10天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
22 1
|
6月前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
算法 Java 调度
Java由浅入深理解线程池设计和原理1
Java由浅入深理解线程池设计和原理1
172 0
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
4月前
|
监控 Java 开发者
深入理解Java并发编程:线程池的原理与实践
【5月更文挑战第85天】 在现代Java应用开发中,高效地处理并发任务是提升性能和响应能力的关键。线程池作为一种管理线程的机制,其合理使用能够显著减少资源消耗并优化系统吞吐量。本文将详细探讨线程池的核心原理,包括其内部工作机制、优势以及如何在Java中正确实现和使用线程池。通过理论分析和实例演示,我们将揭示线程池对提升Java应用性能的重要性,并给出实践中的最佳策略。
|
4月前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
62 1