线程池的核心知识就是:三大方法、7个参数、拒绝策略、优化配置
线程池原理
程序运行的本质是,占用系统资源,CPU/磁盘网络使用。我们希望可以高效的使用资源!池化技术就是不断的演进出来的。
- 池化技术
简单的说,池化技术就是提前准备一些资源,以供使用。
线程的创建和销毁,以及数据库的连接断开都十分浪费资源。
只有是“池”,就会设计到两个常量:
- minSize:最小容量,核心池子的大小
- maxSize最大容量
这些都是为了弹性访问,保证系统运行的效率。
举一个常见的例子。去银行取钱,一般来说银行会固定开放2个窗口供人办理业务。还有3个业务窗口,只有等到高峰期才会开放使用。银行里提供了一个等待区(候客厅)有3个位置。当你去办理业务时,前面有人正办理,那你就需要坐在等待区,等待传唤。
正常情况下,core大小:2
queue大小:3
maxSize: 5
最可以存在人数:maxSize+queue =8人
为什么要使用线程池
- 提高程序执行效率
- 控制线程的数量,防止程序崩溃
为了减少创建和销毁线程的次数,让每个线程可以多次使用,可根据系统情况调整执行的线程数量,防止消耗过多内存,所以我们可以使用线程池.
Executor 介绍
java.util.concurrent.Executor: 大部分线程池相关的接口都是实现这个接口的。
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
它的子接口和实现的类如下:
Executor接口的关系图例(绿色实线箭头是继承,虚线是接口实现)
三大方法
数组有工具类Arrays,集合有工具类Collections,线程池同样有工具类Executors。利用线程池工具类Executors就可以来创建线程池。
创建线程池的三大方法
ExecutorService threadpool1 = Executors.newFixedThreadPool(5); // 固定线程池大小
ExecutorService threadpool2 = Executors.newCachedThreadPool(); //可以弹性伸缩的线程池,遇强则强
ExecutorService threadpool3 = Executors.newSingleThreadExecutor(); // 只有一个
Executors.newFixedThreadPool(n)的使用
package com.jp.executorDemo; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description: TODO:线程池的创建使用:1.创建固定大小的线程池,超过线程池大小部分,将拒绝 * @Version: 1.0 */ public class ExecutorsDemo1 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool= Executors.newFixedThreadPool(5); //固定大小,可自行设置 try { for (int i = 0; i <10 ; i++) { //2.使用线程池来执行线程 threadPool.execute(()->{ System.out.println("线程:"+Thread.currentThread().getName()+"执行任务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //3.关闭线程池 threadPool.shutdown(); } } }
创建了容量大小为5的线程池,遍历10次去执行线程任务。会发现从始到终只有5个线程交替执行任务
Executors.newCachedThreadPool()的使用
这种线程池遇强则强,会弹性扩张,在实际的工作开发中不建议使用。
package com.jp.executorDemo; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description: TODO:线程池的创建使用:2.创建弹性的线程池,能自动扩张 * @Version: 1.0 */ public class ExecutorsDemo1 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool= Executors.newCachedThreadPool();//弹性线程池,能自动扩张 try { for (int i = 0; i <10 ; i++) { //2.使用线程池来执行线程 threadPool.execute(()->{ System.out.println("线程:"+Thread.currentThread().getName()+"执行任务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //3.关闭线程池 threadPool.shutdown(); } } }
有10个线程在执行任务,运行效果如下:
Executors.newSingleThreadExecutor()的使用
package com.jp.executorDemo; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description: TODO:线程池的创建使用:3.创建单线程池,只有一个线程 * @Version: 1.0 */ public class ExecutorsDemo1 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool= Executors.newSingleThreadExecutor();//单线程池 try { for (int i = 0; i <10 ; i++) { //2.使用线程池来执行线程 threadPool.execute(()->{ System.out.println("线程:"+Thread.currentThread().getName()+"执行任务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //3.关闭线程池 threadPool.shutdown(); } } }
只有一个线程在执行任务:
在实际的工作环境中,上述的几种线程池创建方法都有很大的问题。禁止使用Executors去创建线程池。。我们要使用ThreadPoolExecutor 根据实际业务需要去自定义创建线程池。
阿里巴巴开发文档有这样写到
使用Executors创建的线程池容易发生OOM(内存用尽). 因为它允许的其你去队列大小是integer最大值。
ThreadPoolExecutor 七大参数
在讲 ThreadPoolExecutor 7大参数之前,我们先来分析一下Executors的三大方法创建线程池的底层代码
//固定线程池大小 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //创建弹性线程池 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //创建单线程池 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
可以发现他们三个方法底层使用的都是去实例化一个ThreadPoolExecutor对象,设置了7个参数,这就是线程池创建的核心
public ThreadPoolExecutor(int corePoolSize, // 核心池子的大小 int maximumPoolSize, // 池子的最大大小 long keepAliveTime, // 空闲线程的保留时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 队列 ThreadFactory threadFactory, // 线程工厂,不修改!用来创建线程 RejectedExecutionHandler handler // 拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
7个参数
int corePoolSize, // 核心池子的大小
int maximumPoolSize, // 池子的最大大小
long keepAliveTime, // 空闲线程的保留时间,即在这个时间过后回收空闲的线程
TimeUnit unit, // 时间单位
BlockingQueue workQueue, // 队列
ThreadFactory threadFactory, // 线程工厂,不修改!用来创建线程
RejectedExecutionHandler handler // 拒绝策略
既然了解到了线程池创建的核心,那么我们就使用这个方法去实现之前银行排队办理业务的案例。
总共8人来办理业务,日常只开启2个工作台,有3个队列位置,还有3个工作台只有等队列满了,才会开放。
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、只有队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小,代表核心的2个工作台 5, // 线程池最大大小5,代表最大可开启的工作台 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.AbortPolicy () //拒绝策略,这里使用的是默认的测了:队列满了,就丢弃任务抛出异常! ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <= 8; i++) { //8个人 // 默认在处理 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
因为办理业务是8人,核心工作台为2个,队列只有3个位置,队列已经排满,所以会触发最多线程池,开启5个工作台。
那如果共有9个人来办理业务,而我们设置最多只能存8人,会出现怎么样的结果呢?
只要超过线程池大小就会触发拒绝策略
四种拒绝策略
AbortPolicy (默认的:队列满了,就丢弃任务抛出异常!);
CallerRunsPolicy(哪来的回哪去? 谁叫你来的,你就去哪里处理);
DiscardOldestPolicy (尝试将最早进入队列的任务删除,尝试加入新任务);
DiscardPolicy (队列满了任务也会丢弃,不抛出异常)。
AbortPolicy
使用该策略,当请求线程超过线程池(maximumPoolSize + workQueue)就会丢弃任务抛出异常
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 5, // 线程池最大大小5 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.AbortPolicy () //拒绝策略 ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <= 9; i++) { // 9个人 // 默认在处理 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
我们把人数设置为9,在执行测试
CallerRunsPolicy
使用该策略,当请求线程超过线程池(maximumPoolSize + workQueue)就会哪里的会哪去执行。
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 5, // 线程池最大大小5 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.CallerRunsPolicy () //拒绝策略 ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <= 9; i++) { // 9个人 // 默认在处理 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
执行结果
DiscardOldestPolicy
使用该策略,当请求线程超过线程池(maximumPoolSize + workQueue)就会尝试将最早进入队列的任务删除,尝试加入新任务.
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 5, // 线程池最大大小5 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.DiscardOldestPolicy () //拒绝策略 ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <=15; i++) { // 15个人 // 默认在处理 final int temp=i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."+temp); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
执行结果
DiscardPolicy
使用该策略,当请求线程超过线程池(maximumPoolSize + workQueue)队列满了,后面的任务便会丢弃,不抛出异常
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 5, // 线程池最大大小5 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.DiscardPolicy () //拒绝策略 ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <=15; i++) { // 15个人 // 默认在处理 final int temp=i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."+temp); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
线程池实现原理
提交一个任务到线程池中,线程池的处理流程如下:
1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
优化配置
在工作中,我们应该如何合理的设置线程池的参数呢?通常我们会从2个方面去考虑
- CPU 密集型
- IO 密集型
CPU密集型就是根据最大能支持多少个线程同时跑,一般将线程池的maximumPoolSize(最大线程池) 参数设置与CPU处理器一样大就可以了。可以通过如下方法获取到服务器运行环境的CPU个数:
Runtime.getRuntime.availableProcessors(); //获取到所处运行环境的CPU个数
完整代码
package com.jp.executorDemo; import java.util.concurrent.*; /** * @className: * @PackageName: com.jp.executorDemo * @author: youjp * @create: 2020-05-29 16:12 * @description 描述:线程池7大参数的使用 * 1、队列满了,就会触发最大线程池,否则永远都只是corePoolSize个线程在运行,所以,队列大小一定要根据业务情况进行设置; * 2、当请求线程超过线程池(maximumPoolSize + workQueue),就会触发拒绝策略,至于怎么拒绝,与拒绝策略RejectedExecutionHandler有关。 */ public class ExecutorsDemo2 { public static void main(String[] args) { //1.创建线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, // 核心池子的大小 Runtime.getRuntime().availableProcessors(), // 线程池最大数 2L, // 空闲线程的保留时间 TimeUnit.SECONDS, // 超时回收空闲的线程 new LinkedBlockingDeque<>(3), // *根据业务设置队列大小,队列大小一定要设置* Executors.defaultThreadFactory(), // 不用变 new ThreadPoolExecutor.DiscardOldestPolicy () //拒绝策略 ); try { // 队列 RejectedExecutionException 拒绝策略 for (int i = 1; i <=15; i++) { // 15个人 // 默认在处理 final int temp=i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+" running...."+temp); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
IO 密集型从磁盘读写、 一个线程在IO操作的时候、另外一个线程在CPU中跑,造成CPU空闲。最大线程数应该设置为 IO任务数! 对于大文件的读写非常耗时,我们应该用单独的线程让他慢慢跑。