线程池原理初探以及源码分析(详解)

简介: 线程池原理初探以及源码分析(详解)
1,为什么用线程池

在学java基础的时候,就学过线程的创建方式,如继承Thread类,实现Runnable接口,实现Callable接口这三种,但是在企业级开发中,由于存在多线程以及高并发等现象,如果大量的使用以上三种方式创建线程,则在创建以及销毁线程的过程中,需要消耗大量的资源,为了避免这种现象的发生,并且更好的分配和监控线程,才有了线程池。

2,什么是线程池

线程池,也可以称为线程缓存,和字符串常量池,数据库连接池等原理一样,就是一个池子,都是为了更加高效的对项目进行开发和管理,更好的去分配和管理资源。由于线程是稀缺的资源,所以如果被无限制的创建,不仅会大量消耗系统资源,还会降低系统的稳定性,因此java中提供线程池对线程进行分配,监控调优等。如果在并发中请求的数量非常多,但是执行时间很短,那么在创建销毁时就会花费大量的时间,就可能导致系统的效率大大降低。基本思想就是提前创建线程,重复使用。

3,ThreadPoolExecutor

要明白线程池,这个类ThreadPoolExecutor 是必须要先知道的,并且可以查看类与顶层接口的关系图,(快捷键Ctrl + shift + alt + u可以快速查看)

其关系图如上,主要是实现了顶层接口 Executor,在该接口中存在唯一一个抽象方法execute,主要用于提交任务

由此这个ThreadPoolExecutor类就是对Execute接口的具体的实现,所以接下来重点分析这个类以及他的实现原理。该类的构造方法如下,由于存在多态,由此挑选一个参数最多的构造方法来讲,并重点分析各个参数的意义

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
}

接下来对几个参数做一个初步的定义

int corePoolSize:核心线程数,默认不设置超时时间,当然可以手动设置过期超时时间
int maximumPoolSize:最大线程数 = 非核心线程 + 核心线程数,即线程池里面最多容纳的数量
long keepAliveTime:线程池维护线程所允许的空闲时间,非核心线程超时则会默认被销毁
TimeUnit unit:时间单位,有分钟,s,毫秒,微秒,纳秒等
BlockingQueue<Runnable> workQueue:存放未来得及提交的任务,其本质为一个BlockingQueue的阻塞队列,当线程池
                   中的核心线程满了之后就会将任务放入到阻塞队列里面
ThreadFactory threadFactory:线程工厂默认使用Executors里面的 defaultThreadFactory方法 来创建线程
RejectedExecutionHandler handler:拒绝策略,当线程池里面满了以及阻塞阻塞队列满了的情况下,会触发拒绝策略
4,线程池底层实现流程以及原理

如下图,可能不知道为啥上面要设置这么多参数,接下来看下具体的实现流程就能知道这些参数的意思了

第一步,通过execute方法,将任务提交给线程池,首先,判断线程池中的线程数量,如果数量为空或者数量少于核心线程,则创建线程,并且将该线程标记为核心线程。当线程执行完,再次提交一个任务时,则会再创建一个新的线程,直到线程池中的核心线程数满为止。如上图中的 corePool 中的线程,因此参数核心线程的大小即为corepool的大小。

第二步,如果依旧有任务继续提交,线程池不会立马创建线程,而是将当前的任务放入到阻塞队列里面,当核心线程池中的线程将任务消费之后,则会监听这个阻塞队列,如果阻塞队列不为空,则消费队列里面的任务。由于队列本质是FIFO,先进先出的,所以会先消费最先进入队列的任务,即队头上面的任务。

第三步,在阻塞队列中的任务满了时,每个队列都有容量的大小,可以去了解一下 BlockingQueue 的原理。再提交一个任务,则会触发线程池中创建非核心线程,每个非核心线程来完成提交的任务,直到达到线程池中的最大线程数,并此时创建的线程设置为非核心线程

第四步,当线程池里面满了之后,会将线程丢到拒绝策略里面,即将要进来的任务进行拦截


可能对上面这个描述有一点模糊,接下来举一个具体的银行的实例来形象的描述一下这个执行流程。柜台的窗口就是代表池子里面最大允许存在的线程的数量,绿色的三个窗口代表的就是核心线程的数量,全部窗口代表的就是线程池的允许存在的最大线程的数量,下面的蓝色部分等候区,就是对应的阻塞队列 BlockingQueue,接下来具体流程如下


87fa4c8954ea4f7387f807b6fbcf0c10.png

1,假设今天只开了三个窗口,对应的就是三个核心线程的数量,一个人来办理业务就对应一个窗口,即为对应的核心线程

2,如果三个窗口满了,则下一个人在等候区等待,就是对应的阻塞队列,当然等候区的座位有限,即对应队列的长度也是有 限的,不过第一个人来到等候区会第一个办理业务,即对应队列的性质,先进先出

3,等候区也满了,银行发现今天来办理业务的人多,因此就会多开窗口临时接待这些办理业务的人员,该窗口对应的就是非核心线程,当然窗口有限,因此非核心线程的数量也是有限的,又因为该窗口为临时窗口,所以在没人的时候或者人数少的时候,该窗口也会被关闭,让临时工休息或者解雇,以防止资源的浪费,因此对应的非核心线程就被设置了空闲状态下的超时时间,也是为了避免资源的浪费

4,如果柜台的人数也满了,等候区的人数也满了,即达到饱和状态,银行就会劝退接下来要来办理业务的人,让他们下午来或者明天来,当然不同人有不同的劝退方式,这就是线程池的存在的多种拒绝策略的原因


赠人玫瑰,手留余香,上面这一段例子主要是参考这位大佬的视频讲解的https://www.bilibili.com/video/BV1dt4y1i7Gt?spm_id_from=333.880.my_history.page.click

5,源码初探

在整个原理流程图中,主要由execute方法来决定执行到哪一步,接下来查看一下execute方法的底层逻辑,大概在当前类的1342行左右,找不到可以直接 ctrl + f12,输入这个方法就能找到了。

具体流程英文注释已经写得很清楚了,如果懒得百度的话可以安装一个网易云有道词典,直接截屏翻译即可,对面那些不友好不能复制的bug也是可以直接截屏翻译的。其具体流程如下,对部分代码进行解释

// ctl中记录这当前线程池的运行状态以及线程的数量
int c = ctl.get();
//workerCountOf 用于表示当前活动的线程数
if (workerCountOf(c) < corePoolSize) {
  //addWorker():在线程池中创建一个新的线程并执行,其中第二个参数为设置成核心线程
    if (addWorker(command, true))
        return;
    //如果添加失败,则重新获取ctl的值
    c = ctl.get();
}
//如果当前线程池是运行状态并且任务添加到阻塞队列成功
//这个offer方法是一个阻塞队列的添加任务的方法,返回值为布尔类型
//在不满足第一个if并且线程处于运行状态,则说明活动线程数应该 >= 核心线程数,此时要做的就是将任务加入到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //如果不是运行状态,由于之前已经把command 添加到workQueue中了,
        //这时需要移除或者丢弃当前进来的这个command,并对后面的任务直接开启拒绝策略
        if (! isRunning(recheck) && remove(command))
          //拒绝策略
            reject(command);
        //处于运行状态但是线程池的有效线程数为0
        else if (workerCountOf(recheck) == 0)
          //依旧在线程池中创建一个线程,null表示没有任务,即不去启动线程池,false代表设置成非核心线程
          //其主要目的是保证线程在运行状态下,存在一个执行任务的线程,因为队列里面已经添加了一个任务
            addWorker(null, false);
}
//到这里的话意思就是可能当前线程池不处于running状态
//或者说此时核心线程数量已满,队列也已满,则触发创建非核心线程
//当然这里是不满足上述情况,即可能超过最大的线程池的容量,则触发拒绝策略
else if (!addWorker(command, false))
        reject(command); 

可能对上面的那个addWorker方法可能不太了解他是如何实现的,接下来继续看看他的底层源码,addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少maximumPoolSize

d1537d0f0dcb4eaba7d467db27394941.png

大家可以自行看一下里面的源码,都比较简单,主要是对线程池的状态进行的一些判断,如果处于非运行状态,则直接return false。当然如果处于运行状态,就会对有效的线程数进行判断,是设置为核心线程还是非核心线程,并通过Worker方法来创建对象,下面最终也是通过调用start方法来启动线程

与此同时,可能会出现一个问题,就是如果在执行任务的过程中,如果其中一个任务出现了异常,该任务会被如何处理,以及后面的任务能否被线程池的线程执行。接下来看以下这段代码,该段代码在执行线程任务的 runWorker 方法里面,可以发现他处理异常是一只往外抛,并没有对异常做具体的捕获,因此在多任务里面,如果中途一个任务出现异常,则会直接被忽视该任务,并且该任务也会直接会丢弃,直接执行下一个任务。所以在重要的日志等重要信息中,最好手动去捕获一下异常。因此线程池不会中途一个任务出现异常而导致线程池中断运行。

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }
}
6,线程池状态

如下图,线程池的状态依旧在该类下面的300多行,以一个私有静态的final类表示。运算符有 & 按位与,|按位或,^按位异或,~取反,>>右移,<<左移等。这里采用左移,比较好解释,一个二进制 00000001 = 1,在左移两位之后,就是相当于整体左移,然后原来的被移动的空位补0即可(学校应该都会学…),最后变成 00 00000100 = 2的2次方 = 4。

再如下图,这个 COUNT_BITS = Integer.SIZE - 3,Integer.SIZE点进去可以发现他的大小为 32,即以二进制补码的关系表示整型包装类的大小。即整型的长度为32位


接下来就好办了,取其中一个数就好了,由于负数涉及到二进制的补码问题,所以取一个整数示例。如上面的STOP = 1,则它的二进制为 00000001,而在进行左移32-3=29,即左移了29位,大概就是 001(00…00),括号里面跟29位0或者1,其他的照推,运行状态为111(后面跟29个0或者1)。因此这个状态就相当于取高位的前三位,111,000,001,010,011,刚好对应这五种状态。然后下面有一个容量 CAPACITY ,为(1 << COUNT_BITS) - 1,其实我一开始也想了一下,为啥要减一,就是高三位最小的是001,所以后面的29位二进制最大值就是全为1的时候最大,就等于(001 << COUNT_BITS) - 1。刚好就是前3位高进制代表运行状态,后29位用于保存线程池的容量,大概就是 2的29次方 - 1,当然这只是理论上的容量,也是最大容量,但是现实生产环境还得考虑吞吐量,cpu等等实际环境,需要根据实际的具体情况设置线程池容量的大小。一般来讲为了避免资源的浪费,会将线程池容量大小和核心线程的数量大小设置接近或者一样。


7f7577fc765548f985f5e32efeacf09a.png

在上图中,就是五种状态的关系。接下来简单的描述一下这五种状态

1、RUNNING运行状态时

能够接收新任务,以及对已添加的任务进行处理,线程池被一旦被创建,就处于RUNNING状态,并且线程池中此时的任务数为0,其对应的高三位为 111。011

2、 SHUTDOWN

不可以接收新任务,但能处理已添加的任务,如阻塞队列里面的任务,调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN,其对应的高三位为000。

3、STOP

不接收新任务,不处理已添加的任务,并且会中断正在处理的任务,在调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP,其对应的高三位为001。

4、TIDYING

当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING,其对应的高三位为010。

5、 TERMINATED

线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING 转化为TERMINATED。其对应的高三位为010

7,常用线程池创建方式

jdk1.8之前,自带了如下4种创建线程池的方式,主要是在Executors类下的,主要有newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool四种方式,通过源码可以发现该四种都是使用了 ThreadPoolExecutor 来实现,因此接下来可以通过底层分析四种常用线程池的工作原理

1,newSingleThreadExecutor

single,熟悉的单例模式。顾名思义,即只创建只有一个线程的线程池。一般在单线程的情景才会使用,因此用处不多。通过上述线程池的原理讲解也可知,该线程池的核心线程和最大线程都是固定为1,所以只能排队进入队列一个个执行,再根据FIFO,顺序执行。

dd7688b663754178843f6ee3d7131595.png


2,newFixedThreadPool

该线程池被称为固定大小线程池,看ThreadPoolExecutor 前面两个参数都是 nThreads,并且是外部传进来的,即核心线程和最大线程一样,整个池子里面的线程都是核心线程。那就相当于传10000个任务进来,可以直接被线程池全部执行,与此同时,线程池需要创建10000个线程,这样子比单例快,但是可能执行一次需要占用大量的CPU,并且线程池里面的线程不能实现复用。当然,假设有10000个任务过来,设置的核心和最大线程数量为100,多余的进入阻塞队列,假设阻塞队列足够长的话,可以将任务存储到队列中,这样子虽然可以实现核心线程的复用,但是由于线程池容量的大小有限,需要经过大量时间排队等在,也会降低该线程池的效率

7b4512406d6546cd85a0fa86e56b164d.png


3,newCachedThreadPool

缓存线程池,带有缓存功能的线程池,这种线程池具有线程复用的功能。根据参数可知核心线程的数量为0,但是最大线程数为2的32次方-1,即线程池的容量大小相对于是比较大的,并且允许闲置 60再销毁该线程 。而线程池里面的线程都是非核心线程,相当于里面的线程都是临时工,有任务就干,没任务再见这种,对资源相对保障。但是这个非核心线程只有超过最大的空闲时间才会被销毁,只要在这段时间有任务进来,就开通过该线程去执行任务,而不需要重新创建线程,大大提高了线程的复用率,同时也减少了资源的开销,该线程池运行速率相对较快,效率相对较高

26fd5b96f56242739fca4cbd027da993.png


4,newScheduledThreadPool

顾名思义,就是延迟后运行命令或者定期地执行任务。这里没什么好说的


cd1ba2a40f324488b684cc8a41e8c66d.png

通过分析就发现这四种创建线程的方式,发现其底层只是通过改变了ThreadPoolExecutor 的核心线程数以及最大线程数这个两个参数的值,通过改变参数而实现不同场景下的不同功能,从而实现了这四种线程池。当然并不能说这四种线程池方式哪种好哪种坏,还需要根据实际的业务情况还选取具体的线程池,合适才是最好的。

8,线程池饱和拒绝策略

通过源码分析,其最顶端的父接口是RejectedExecutionHandler接口,其中存在rejectedExecution抽象方法,接下来通过该接口的具体实现类来了解对应的策略

1,CallerRunsPolicy

其为ThreadPoolExecutor 类里面的一个静态内部类,英文注释也说的很明白了,用调用者所在的线程来执行任务

2,AbortPolicy,直接抛出异常,该策略为默认策略

3,DiscardPolicy,直接丢弃任务

4,DiscardOldestPolicy,丢弃阻塞队列中靠最前的任务,并执行当前任务

5,当然我们也可以根据具体的业务场景,自定义一个实现RejectedExecutionHandler接口的类,重写里面的抽象方法即可

9,总结

在限流,响应式编程,redis-分布式锁,定时任务底层等等都是基于线程池来实现的,本篇主要探讨线程池的原理以及一些底层实现。当然线程以及线程池都是稀缺资源,因此线程池和其他的数据库连接池一样,在不使用的情况下,需要及时的进行shutdown关闭,避免造成资源浪费。

相关文章
|
8天前
|
Java
并发编程之线程池的底层原理的详细解析
并发编程之线程池的底层原理的详细解析
45 0
|
24天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
1月前
|
算法 安全 调度
多线程如何工作,工作原理是什么
多线程如何工作,工作原理是什么
|
3月前
|
存储 安全 调度
探索Python的多线程编程:原理与实践
探索Python的多线程编程:原理与实践
60 0
|
30天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
230 2
|
1月前
|
设计模式 安全 C++
【C++ const 函数 的使用】C++ 中 const 成员函数与线程安全性:原理、案例与最佳实践
【C++ const 函数 的使用】C++ 中 const 成员函数与线程安全性:原理、案例与最佳实践
238 2
|
1月前
|
Java 程序员 API
【深入探究 Qt 线程】一文详细解析Qt线程的内部原理与实现策略
【深入探究 Qt 线程】一文详细解析Qt线程的内部原理与实现策略
79 0
|
1月前
|
Java API 开发者
springboot 多线程的使用原理与实战
在Spring Boot中实现多线程,主要依赖于Spring框架的@Async注解以及底层Java的并发框架。这里将深入剖析Spring Boot多线程的原理,包括@Async注解的工作方式、任务执行器的角色以及如何通过配置来调整线程行为。
44 5
|
1月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
56 2
|
1月前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
45 1