先看几道多线程相关的问题
1、三个线程 a、b、c并发运行,b,c线程需要a的数据怎么实现?
难点:
1、是让 ThreadB 和 ThreadC 等待 ThreadA 先执行完
2、 ThreadA 执行完之后给ThreadB和ThreadC发送消息
思路:我们必须让ThreadB和ThreadC去等待ThreadA完成任务后发出的消息
并使用Semaphore类来控制线程的等待acquire和释放release()方法释放permit
1、线程池ThreadPoolExecutor的实现原理?
Java线程池的工作原理为:
- JVM先根据用户的参数创建一定数量的可运行的线程任务,并将其放入队列中,在线程创建后启动这些任务,如果线程数量超过了最大线程数量(用户设置的线程池大小),则超出数量的线程排队等候,在有任务执行完毕后,线程池调度器会发现有可用的线程,进而再次从队列中取出任务并执行。
- 通过减少频繁创建和销毁线程来降低性能损耗。每个线程都需要一个内存栈,用于存储诸如局部变量、操作栈等信息,可以通过-Xss参数来调整每个线程栈大小(1024k)
- 线程池一般配合队列一起工作,使用线程池限制并发处理任务的数量,然后设置队列的大小,当任务超过队列大小时,创建新线程来处理任务,如果当前线程超出 maximumPoolSize,任务将被拒绝,通过一定的拒绝策略来处理,这样可以保护系统免受大流量而导致崩溃。
特点:线程池一般有核心池大小和线程池最大大小设置,当线程池中的线程空闲一段时间后会被回收,而核心线程池中的线程不会被回收。
线程池的主要作用:
- 线程复用、线程资源管理、控制操作系统的最大并发数,以保证系统高效(通过线程资源复用实现)且安全(通过控制最大线程并发数实现)地运行
1.1、线程复用
在Java中,每个Thread类都有一个start方法。在程序调用start方法启动线程时,Java虚拟机会调用该类的run方法。前面说过,在Thread类的run方法中其实调用了Runnable对象的run方法,因此可以继承Thread类,在start方法中不断循环调用传递进来的Runnable对象,程序就会不断执行run方法中的代码。可以将在循环方法中不断获取的Runnable对象存放在Queue中,当前线程在获取下一个Runnable对象之前可以是阻塞的,这样既能有效控制正在执行的线程个数,也能保证系统中正在等待执行的其他线程有序执行。这样就简单实现了一个线程池,达到了线程复用的效果。
1.2、线程池的核心组件和核心类
Java线程池主要由以下4个核心组件组成。
◎ 线程池管理器:用于创建并管理线程池。
◎ 工作线程:线程池中执行具体任务的线程。
◎ 任务接口:用于定义工作线程的调度和执行策略,只有线程实现了该接口,线程中的任务才能够被线程池调度。
◎ 任务队列:存放待处理的任务,新的任务将会不断被加入队列中,执行完成的任务将被从队列中移除。
Java中的线程池是通过Executor框架实现的,在该框架中用到了Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future、FutureTask这几个核心类,具体的继承关系如图所示
其中,ThreadPoolExecutor是构建线程的核心方法,该方法的定义如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
ThreadPoolExecutor构造函数的具体参数如表所示:
底层原理:
- 几个重要的字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
- ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿
ctl相关方法
- runStateOf:获取运行状态;
- workerCountOf:获取活动线程数;
- ctlOf:获取运行状态和活动线程数的值。
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
1.3、Java线程池的工作流程
Java线程池的工作流程为:线程池刚被创建时,只是向系统申请一个用于执行线程队列和管理线程池的线程资源。在调用execute()添加一个任务时,线程池会按照以下流程执行任务。
◎ 如果正在运行的线程数量少于corePoolSize(用户定义的核心线程数),线程池就会立刻创建线程并执行该线程任务。
◎ 如果正在运行的线程数量大于等于corePoolSize,该任务就将被放入阻塞队列中。
◎ 在阻塞队列已满且正在运行的线程数量少于maximumPoolSize时,线程池会创建非核心线程立刻执行该线程任务。
◎ 在阻塞队列已满且正在运行的线程数量大于等于maximumPoolSize时,线程池将拒绝执行该线程任务并抛出RejectExecutionException异常。
◎ 在线程任务执行完毕后,该任务将被从线程池队列中移除,线程池将从队列中取下一个线程任务继续执行。
◎ 在线程处于空闲状态的时间超过keepAliveTime时间时,正在运行的线程数量超过corePoolSize,该线程将会被认定为空闲线程并停止。因此在线程池中所有线程任务都执行完毕后,线程池会收缩到corePoolSize大小。
具体的流程如下图所示:
1.4、 线程池的拒绝策略
若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。JDK内置的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 这4种,默认的拒绝策略在 ThreadPoolExecutor 中作为内部类提供。在默认的拒绝策略不能满足应用的需求时,可以自定义拒绝策略
1.AbortPolicy (默认)
- 直接抛出异常,没有特殊需求直接使用该策略即可
- 具体的JDK源码如下
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } /** * Always throws RejectedExecutionException. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
2.CallerRunsPolicy
- CallerRunsPolicy的拒绝策略为:如果被丢弃的线程任务未关闭,则执行该线程任务。注意,CallerRunsPolicy拒绝策略不会真的丢弃任务。
- SynchronousQueue一般会搭配 CallerRunsPolicy 使用,这2个是个绝佳组合
- 这个组合起到的效果是:当线程池处理不过来时,直接交由调用者线程(往线程池里添加任务的主线程)来执行,此时任务不会被积压在队列里,同时调用者线程无法继续提交任务
- 具体的JDK实现源码如下
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Action:SynchronousQueue是啥?
- SynchronousQueue 不是一个真正的队列,而是一种在线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。如果没有线程等待,并且线程池的当前大小小于 maximumPoolSize,那么线程池将创建一个线程,否则根据拒绝策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被放在队列中,然后由工作线程从队列中提取任务。只有当线程池是无界的或者可以拒绝任务时,该队列才有实际价值,Executors.newCachedThreadPool使用了该队列。
3.DiscardOldestPolicy
- DiscardOldestPolicy的拒绝策略为:移除线程队列中最早的一个线程任务,并尝试提交当前任务。实际开发中不怎么使用
- 具体的JDK实现源码如下
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
4.DiscardPolicy
- DiscardPolicy的拒绝策略为:丢弃当前的线程任务而不做任何处理。如果系统允许在资源不足的情况下丢弃部分任务,则这将是保障系统安全、稳定的一种很好的方案。
- 这个策略一般在线程池执行的是不太重要的任务时使用
- 具体的JDK实现源码如下
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
5.自定义拒绝策略
- 以上4种拒绝策略均实现了RejectedExecutionHandler接口,若无法满足实际需要,则用户可以自己扩展RejectedExecutionHandler接口来实现拒绝策略,并捕获异常来实现自定义拒绝策略。
- 下面实现一个自定义拒绝策略DiscardOldestNPolicy,该策略根据传入的参数丢弃最老的N个线程,以便在出现异常时释放更多的资源,保障后续线程任务整体、稳定地运行。
- 具体的JDK实现源码如下:
- 商品中心使用的,拓展接口实现拒绝策略
/** * 线程池已经无法处理,重写饱和策略,直接不执行丢弃任务,记录error日志 */ private static class RewriteRunsPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("线程池:{} 需要处理的任务已经超过任务队列长度(当前队列长度:{}), 需要当前的工作线程自行处理当前任务", NAME, executor.getQueue().size()); } }
1.5、线程池的5种状态
状态 | 特点 |
1、运行RUNNING | 会接收新任务、处理阻塞队列中的任务; |
2、关闭SHUTDOWN | 不会接收新任务,会处理阻塞队列中的任务; |
3、停止STOP | 不接收,不处理任务,会中断正在运行的任务; |
4、休息TIDYING | 如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态 |
5、终止TERMINATED | 停止工作; |
shutdown()和shutdownNow()区别
- 原理:遍历线程池中的工作线程,然后逐一调用线程的interrupt方法来中断线程
- 区别:
- shutdown():不立即终止线程池,要等所有任务队列中的任务都执行完后才终止,不会接受新的任务 //状态设置为STOP
- shutdownNow():立即终止线程池,中断正在执行的任务,清空任务缓存队列,返回尚未执行的任务 //状态设置为SHUTDOWN
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
下图为线程池的状态转换过程:
2、Executor框架
主要由3大部分组成
任务 | 执行任务需要实现的接口:runnable接口和callable接口 |
任务的执行 | 核心接口Executor,以及继承自Executor的executorService 接口,两大实现类ThreadPoolExecutor和scheduledThreadpoolExecutor |
异步计算的结果 | 包括接口Future和实现类Futuretask |
3、java提供了ExecutorService的三种实现:
3.1、ThreadPoolExecutor:标准线程池 阿里
序号 | 参数 | 含义 |
1 | corePoolSize | 核心线程池大小,线程池维护的线程最小大小,即没有任务处理情况下,线程池可以有多个空闲线程(少于corePoolSize的一直创建线程,即使有线程空闲;空闲线程不释放,任务数多于corePoolSize的任务放入阻塞队列) |
2 | maximumPoolSize | 线程池最大线程大小 |
3 | keepAliveTime | 线程池中线程的最大空闲时间,存活时间超过该时间的线程会被回收。 |
4 | workqueue | 线程池使用的任务缓冲队列(包括:有界阻塞队列 ArrayBlockingQueue, //有界阻塞队列需要设置合理的队列大小;有界/无界阻塞链表队列 LinkedBlockingQueue;优先级阻塞队列 PriorityBlockingQueue;无缓冲区阻塞队列 SynchronousQueue) |
5 | ThreadFactory | 创建线程的工厂,我们可以设置线程的名字,是否是后台线程;作用:统一创建线程时的参数,如是否守护线程,默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称 |
6 | rejectedExecutionHandler | (京东面试题,蚂蚁金服)当缓冲队列满后的拒绝策略,包括:1、Abort(直接抛出 rejectedExecutionException);2、Discard(按照LIFO丢弃)新任务被抛弃;3、DiscardOldest(按照LRU丢弃)旧任务被抛弃;4、CallerRuns(主线程执行)在调用者的线程中运行新的任务,既不抛弃任务也不抛出异常;5、自定义策略(用于记录日志或持久化存储不能处理的任务) 详情见1.4、 线程池的拒绝策略 |
workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
- 直接切换:
- 这种方式常用的队列是SynchronousQueue,但现在还没有研究过该队列,这里暂时还没法介绍;
- 使用无界队列:
- 一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:
- 一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
- 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
- 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
- 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
线程池工具类Demo: 核心线程数2、最大线程数4 失活时间60s,阻塞队列1000,用来处理刷数据需求。
public class XxxThreadPoolUtil { /** * 线程池名称 */ private static final String NAME = "XxxThreadPool"; private static final AtomicInteger threadNum = new AtomicInteger(1); private static final ThreadPoolExecutor SEND_EXECUTOR = new ThreadPoolExecutor( 2,4,60, TimeUnit.MINUTES,new ArrayBlockingQueue<>(1000), r -> new Thread(r, NAME + "-" + threadNum.getAndDecrement()), new RewriteRunsPolicy() ); // 饿汉式单例模式 public static ThreadPoolExecutor getThreadPool(){ return SEND_EXECUTOR; } /** * 线程池已经无法处理,重写饱和策略,直接不执行丢弃任务,记录error日志 */ private static class RewriteRunsPolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("线程池:{} 需要处理的任务已经超过任务队列长度(当前队列长度:{}), 需要当前的工作线程自行处理当前任务", NAME, executor.getQueue().size()); } } } // 线程池的使用 XxxThreadPoolUtil.getThreadPool().submit(() -> sendXxx(obj));
3.2、ScheduledThreadPoolExecutor:支持延迟任务的线程池
使用executors来创建,两种子类(此线程池使用较少)
1、scheduledthreadpoolExecutor
2、singleThreadscheduledExecutor
3.3、ForkJoinPool:(分支/合并框架)java7引入
类似于ThreadPoolExecutor,就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个小任务运算的结果进行join汇总。
可以使用work-stealing模式,其会为线程池中的每个线程创建一个队列,用work-stealing(任务窃取)算法使得线程可以从其他线程任务里窃取任务来执行。即如果自己的任务处理完成了,则可以去忙绿的工作线程那里窃取任务执行。
fork/join框架与线程池的区别
1、采取的是使用work-stealing模式
2、一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架中,如果某个子问题由于等待另一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行,这种方式减少了线程的等待时间,提高了性能。
3、任务的类型:
- 1、没有任何返回值,不需要join,使用recursiveAction
比如写数据到磁盘,然后就退出(一个 RecursiveAction可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行) - 2、任务有返回值:使用recursiveTask
子任务的执行结果合并到一个集体结果
3.4、ThreadPoolExecutor 线程池, corePoolSize=5,maximumPoolSize=10, queueCapacity=10, 有 20 个耗时任务 交给这个线程池执行, 线程池会如何执行这 20 个任务? 阿里
- 如果当前线程数<corePoolSize, 如果是则创建新的线程执行该任务 。
- 如果当前线程数>=corePoolSize, 则将任务存入 BlockingQueue 。
- 如果阻塞队列已满, 且当前线程数<maximumPoolSize, 则新建线程执行该任务。
- 如果阻塞队列已满, 且当前线程数>=maximumPoolSize, 则抛出异常 。
- RejectedExecutionException, 告诉调用者无法再接受任务了
3.5、用户发消息任务超出队列, 你用哪个拒绝策略? 有其他方法吗 ?阿里
ThreadPoolExecutor.CallerRunsPolicy
方法:
- 无界队列( LinkedBlockingQuene) ,继续添加任务到阻塞队列中等待执行。
- 用消息队列存任务数据, 在线程池慢慢处理
4、常见的线程池: 线程池的接口类是Executors,有一些静态方法
- Java定义了Executor接口并在该接口中定义了execute()用于执行一个线程任务,然后通过ExecutorService实现Executor接口并执行具体的线程操作。ExecutorService接口有多个实现类可用于创建不同的线程池,如表所示是5种常用的线程池
1、newFixedThreadpool
- newFixedThreadPool用于创建一个固定线程数量的线程池,并将线程资源存放在队列中循环使用。在newFixedThreadPool线程池中,若处于活动状态的线程数量大于等于核心线程池的数量,则新提交的任务将在阻塞队列中排队,直到有可用的线程资源
ExecutorService executor = Executors.newFixedThreadPool(10);
等价于return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue())
说明:指定线程数,使用LinkedBlockingQueue 链表阻塞队列- 特点:
- 当线程池没有可执行任务时,线程空闲不释放,由于使用的是无界队列,队列原则上不会限制队列大小,以至于线程池中的任务不会超过corePoolSize
2、newSingleThreadpool
- 单个线程
ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue())
说明:初始化只有一个线程,内部使用 LinkedBlockingQueue 阻塞队列- 特点:保证所提交任务的顺序执行,如果该线程异常结束,会重新创建一个新的线程继续执行任务
3、newCachedThreadpool
- 可缓存的线程池,newCachedThreadPool用于创建一个缓存线程池。之所以叫缓存线程池,是因为它在创建新线程时如果有可重用的线程,则重用它们,否则重新创建一个新的线程并将其添加到线程池中。对于执行时间很短的任务而言,newCachedThreadPool线程池能很大程度地重用线程进而提高系统的性能。
- 线程数量不固定 最大线程数量 Integer.MAX_VALUE,其使用SynchronousQueue队列,一个没有数据缓冲的阻塞队列。对其执行put操作后,必须等待take操作消费该数据,反之亦反,等价于
new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue())
;| - 特点:
- 1、在线程池的keepAliveTime时间超过默认的60秒后,该线程会被终止并从缓存中移除,因此在没有线程任务运行时,newCachedThreadPool将不会占用系统的线程资源;
- 2、当提交新任务时,复用未超过60s的空闲线程,若没有空闲线程,则创建新线程;
- 3、SynchronousQueue是没有容量的阻塞队列/每一个put操作必须要等待一个线程的take操作;缺点:使用时注意控制并发的任务数,防止因创建大量的线程导致而降低性能
- 4、在创建线程时需要执行申请CPU和内存、记录线程状态、控制阻塞等多项工作,复杂且耗时。因此,在有执行时间很短的大量任务需要执行的情况下,newCachedThreadPool能够很好地复用运行中的线程(任务已经完成但未关闭的线程)资源来提高系统的运行效率。具体的创建方式如下:
ExecutorService fService = Executors.newCachedThreadPool();
4、newScheduledThreadPool
- 支持延迟执行的线程池,可设置在给定的延迟时间后执行或者定期执行某个线程任务,使用delayedWorkQueue实现任务延迟。比timer更强大,因为timer对应的是单个后台程序,而ScheduledThreadPool可以在构造函数中指定多个线程
5、newWorkStealingPool
- newWorkStealingPool创建持有足够线程的线程池来达到快速运算的目的,在内部通过使用多个队列来减少各个线程调度产生的竞争。这里所说的有足够的线程指JDK根据当前线程的运行需求向操作系统申请足够的线程,以保障线程的快速执行,并很大程度地使用系统资源,提高并发计算的效率,省去用户根据CPU资源估算并行度的过程。当然,如果开发者想自己定义线程的并发数,则也可以将其作为参数传入
- JDK1.8新增
线程池中,线程对象的两种实现方式: 创建Runnable接口子类对象 重写run方法 创建callable接口子类对象 重写call方法
线程池执行后的返回值:
返回Executorservice接口,接口的对象可以调用submit方法来执行线程池中的线程
5、如何合理地配置线程池 美团
5.1、线程池的适用场景?
- 单个任务处理时间比较短
- 需要处理的任务数量很大
5.2、使用线程池的好处
- 降低资源消耗。
- 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。
- 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。
- 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
5.3、如何合理地配置线程池
根据任务类型是IO密集型还是cpu密集型、任务的优先级,任务的执行时间,任务是否依赖其他系统资源,来设置合理的线程池大小、队列大小、拒绝策略,并进行压测和不断调优来决定适合自己场景的参数。
5.3.1、线程池大小该怎么配置
- 1、cpu密集型(N + 1)经常进行上下文切换,因此配置尽可能小的线程
- 可以将线程数设置为 N( CPU 核心数) +1, 比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断, 或者其它原因导致的任务暂停而带来的影响。 一旦任务暂停, CPU 就会处于空闲状态, 而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- 如何获取CPU核心数
- 商品中心CPU 4核心
/** * CPU的核数 */ private static final int NCPUS = Runtime.getRuntime().availableProcessors();
- 2、IO密集型(2N)因为经常进行IO操作,可以分配多一点线程
- 这种任务应用起来, 系统会用大部分的时间来处理 I/O 交互, 而线程在处理 I/O 的时间段内不会占用 CPU 来处理, 这时就可以将 CPU 交出给其它线程使用。 因此在 I/O 密集型任务的应用中, 我们可以多配置一些线程, 具体的计算方法是 2N
- 更合理的计算方式:
- 我们日常的开发中,我们的任务几乎是离不开I/O的,常见的网络I/O(RPC调用)、磁盘I/O(数据库操作),并且I/O的等待时间通常会占整个任务处理时间的很大一部分,在这种情况下,开启更多的线程可以让 CPU 得到更充分的使用,一个较合理的计算公式如下:
- 线程数 = CPU数 * CPU利用率 * (任务等待时间 / 任务计算时间 + 1)
- 例如我们有个定时任务,部署在4核的服务器上,该任务有100ms在计算,900ms在I/O等待,则线程数约为:4 * 1 * (1 + 900 / 100) = 40个,
- 如果计算时间和I/O时间相等,等价于2N
Action1:如何判断是 CPU 密集任务还是 IO 密集任务?
- CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取, 文件读取这类都是 IO 密集型, 这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少, 大部分时间都花在了等待 IO 操作完成上。
Action2:IO 密集 = Ncpu*2 是怎么计算出来?
- 经验值
- I/O 密集型任务任务应用起来, 系统会用大部分的时间来处理 I/O 交互, 而线程在处理I/O 的时间段内不会占用 CPU 来处理, 这时就可以将 CPU 交出给其它线程使用。 因此在I/O 密集型任务的应用中, 我们可以多配置一些线程。 例如: 数据库交互, 文件上传下载, 网络传输等。 IO 密集型, 即该任务需要大量的 IO, 即大量的阻塞, 故需要多配置线程数。
Action3:实际项目中不要使用,就算只开1个线程,也要用线程池,因为每次创建和回收线程都是需要开销的
5.3.2、存活时间该怎么配置
- keepAliveTime、TimeUnit
- 这两个参数一起决定了非核心线程空闲后的存活时间
- 实际使用过程中不要设置太离谱的值一般问题不大,我个人一般使用5分钟或30分钟
5.3.3、队列大小该怎么配置
- 常见的队列有 ArrayBlockingQueue 和 LinkedBlockingQueue
- 两者的主要区别在于 ArrayBlockingQueue 占用空间会更小,而 LinkedBlockingQueue 在生产者和消费者使用了不同的锁性能会好一点。
- 通常情况下,两者的区别微乎其微,除非你要处理的任务量非常非常大,此时你需要仔细考虑使用哪个更合适,否则通常情况下两个随便选都可以
- 常见的坑:
- 使用 LinkedBlockingQueue 时没设置队列大小,也就是使用了无界队列(Integer.MAX_VALUE),任务处理不过来,不断积压在队列里,最终造成内存溢出
- 使用 LinkedBlockingQueue 一定要设置队列大小
- 对于优先级不同的任务
- 使用优先级队列PriorityBlockingQueue,让优先级高的任务先执行
- 执行时间不同的任务
- 可以使用不同规模的线程池/使用优先级队列,耗时短的先执行
- 依赖数据库连接池的任务
- 线程数应该设置大点
- 建议使用有界队列增加系统的稳定性
- 使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
- 在使用Executors.newFixedThreadPool时,因没有设置队列大小,默认为integer.MAX_VALUE,如果有大量任务被缓存到LinkedBlockingQueue中等待线程执行,则会出现GC慢等问题,造成系统响应慢甚至OOM,因此,在使用线程池时务必设置池大小,队列大小并设置相应的拒绝策略。
5.3.4、线程工厂该怎么配置
通常使用默认的就行。
常见的改动场景是
- 给线程设置个自定义的名字,方便区分。
- 这种场景下,可以使用一些工具类提供的现有方法,也可以将 DefaultThreadFactory 拷贝出来自己修改一下
5.3.5、拒绝策略该怎么配置
拒绝策略,线程池处理不过来时的策略
详情见:1.4、 线程池的拒绝策略
6、使用线程池时遇到过的问题
1、在java的多线程中,一旦线程关闭,就会成为死线程。关闭后死线程就没有办法再启动了。再次启动就会出现异常信息:Exception in thread “main” java.lang.IllegalThreadStateException。
那么如何解决这个问题呢? 可以使用Executors.newSingleThreadExecutor()来再次启动一个线程。
2、我们后台任务线程池的队列和线程池全满了,不断抛弃任务的异常,经过排查,发现是数据库出现了问题,导致sql执行的非常慢。
3、线上问题定位
- 线上问题定位就只能看日志、系统状态和dump线程。
- 1、在linux命令行下使用top命令查看每个进程的情况
关注command是java的性能数据 使用top的交互命令数字1查看每个CPU的性能数据
H:查看每个线程的性能信息
- 会出现3种情况:
- 1、cpu利用率100%,说明这个线程可能死循环,可能是GC造成,可以使用jstat命令查看GC情况;
- 2、一直在top10,说明线程可能有性能问题;
- 3、cpu利用率高的几个线程在不停变化,说明并不是由某一个线程导致CPU偏高。
7、ExecutorService使用: 类似于一个线程池
1、作用:
- 一个线程将一个任务委派给一个 ExecutorService 去异步执行。 一旦该线程将任务委派给 ExecutorService,该线程将继续它自己的执行,独立于该任务的执行。
2、 几种不同的方式来将任务委托给 ExecutorService。
方法 | 特点 |
1、execute(Runnable) | 没有办法得知被执行的结果 |
2、submit(Runnable) | 返回一个 Future 对象;这个Future对象可以用来检查Runnable是否已经执行完毕 //但new Runnable()无法返回数据信息 |
3、submit(Callable) | 返回一个Future对象;new Callable()可以返回数据信息;可判断当前的线程是否执行完毕 |
4、invokeAny() | |
5、invokeAll() |
一个任务可能会由于一个异常而结束,因此它可能没有 “成功”。
- 具体的源码解释可以看第九节的内容
8、线程池的启动策略/增长策略?(蚂蚁金服)
1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2、当调用execute() 方法添加一个任务时,线程池会做如下判断:
- 1、如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
- 2、如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
- 3、如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
- 4、如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉;所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
9、线程池的内部实现
9.1、execute方法内部实现
execute()方法用来提交任务,代码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * clt记录着runState(运行状态)和workerCount(活动线程数) */ int c = ctl.get(); /* * workerCountOf方法取出低29位的值,表示当前活动的线程数; * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; * 并把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断; * 如果为true,根据corePoolSize来判断; * 如果为false,则根据maximumPoolSize来判断 */ if (addWorker(command, true)) return; /* * 如果添加失败,则重新获取ctl值 */ c = ctl.get(); } /* * 如果当前线程池是运行状态并且任务添加到队列成功 */ if (isRunning(c) && workQueue.offer(command)) { // 重新获取ctl值 int recheck = ctl.get(); // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了, // 这时需要移除该command // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回 if (! isRunning(recheck) && remove(command)) reject(command); /* * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法 * 这里传入的参数表示: * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动; * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 如果执行到这里,有两种情况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 如果失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command); }
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
- 1、如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 2、如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 3、如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
- 4、如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);
,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
9.2、addWorker方法
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,
- firstTask参数 用于指定新增的线程执行的第一个任务,
- core参数
- 为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
- false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,
代码如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); /* * 这个if判断 * 如果rs >= SHUTDOWN,则表示此时不再接收新任务; * 接着判断以下3个条件,只要有1个不满足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 * 2. firsTask为空 * 3. 阻塞队列不为空 * * 首先考虑rs == SHUTDOWN的情况 * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false; * 然后,如果firstTask为空,并且workQueue也为空,则返回false, * 因为队列中已经没有任务了,不需要再添加线程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程数(活动线程数) int wc = workerCountOf(c); // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较, // 如果为false则根据maximumPoolSize来比较。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // Re-read ctl // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来创建Worker对象 w = new Worker(firstTask); // 每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
注意一下这里的t.start()这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
9.3、Worker类
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
- Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:
- firstTask用它来保存传入的任务;
- thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
- 在调用构造方法时,需要把任务传入,这里通过
getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。 - Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用
interruptIdleWorkers方法
来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态; - 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
- 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
- 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
- tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
- 正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
9.4、runWorker方法
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; // 允许中断 w.unlock(); // allow interrupts // 是否因为异常退出循环 boolean completedAbruptly = true; try { // 如果task为空,则通过getTask来获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这里说明一下第一个if判断,目的是:
- 如果线程池正在停止,那么要保证当前线程是中断状态;
- 如果不是的话,则要保证当前线程不是中断状态;
这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()
方法会复位中断的状态。
总结一下runWorker方法的执行过程:
- 1、while循环不断地通过getTask()方法获取任务;
- 2、getTask()方法从阻塞队列中取任务;
- 3、如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 4、调用task.run()执行任务;
- 5、如果task为null则跳出循环,执行
processWorkerExit()
方法; - 6、runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
9.5、getTask方法
getTask方法用来从阻塞队列中取任务,代码如下:
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: * 1. rs >= STOP,线程池是否正在stop; * 2. 阻塞队列是否为空。 * 如果以上条件满足,则将workerCount减1并返回null。 * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed变量用于判断是否需要进行超时控制。 // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; // 对于超过核心线程数量的这些线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时 * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 如果减1失败,则返回重试。 * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果 r == null,说明已经超时,timedOut设置为true timedOut = true; } catch (InterruptedException retry) { // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } } }
- 这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。
- 由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
- 什么时候会销毁?
- 当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
9.6、processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); /* * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker; * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker; * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
- 至此,
processWorkerExit
执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit
方法,整个线程结束。
如图所示:
9.7、tryTerminate方法
tryTerminate方法根据线程池状态进行判断是否结束线程池,代码如下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默认什么都不做,留给子类实现 terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(ONLY_ONE);
的作用是 因为在getTask方法中执行workQueue.take()时,如果不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。
9.8、shutdown方法
shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判断 checkShutdownAccess(); // 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池 tryTerminate(); }
这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
下面仔细分析一下:
- 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
- shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
- 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用
workQueue.take()
进行阻塞; - 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了
workQueue.take()
后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了; - 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
- 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
- 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;
- 所以Worker继承自AQS,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。
下面就来分析一下interruptIdleWorkers方法。
9.9、interruptIdleWorkers方法
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。
为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。
9.10、shutdownNow方法
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中断所有工作线程,无论是否空闲 interruptWorkers(); // 取出队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow方法与shutdown方法类似,不同的地方在于:
- 设置状态为STOP;
- 中断所有工作线程,无论是否是空闲的;
- 取出阻塞队列中没有被执行的任务并返回。
shutdownNow方法执行完之后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。
9.11、sumbit()内部实现?
1.将提交的Callable任务会被封装成了FutureTask对象
2.FutureTask类也实现了Runnable接口,通过Executor.execute()提交到线程池,执行run方法;最终返回FutureTask对象
比较:两个方法都可以向线程池提交任务
- execute()方法的返回类型是void,它定义在Executor接口中
- submit()方法可返回持有计算结果的Future对象;定义在ExecutorService接口中
9.12、FutureTask详解
作用 | 代表异步计算的结果;Futuretask实现了Future接口和Runnable接口,因此,futuretask可以交给executor执行,也可以由调用线程直接执行。 |
什么时候使用? | 当一个线程需要等待另一个线程把某个任务执行完后他才能继续执行,此时可以使用futureTask |
实现原理 | 基于AQS(AQS是一个同步框架,他提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列)基于AQS实现的同步器包括:reentrantLock,semaphore,reentrantReadWriteLock,CountDownLatch和futureTask |
同步器的两种类型的操作 | 1、acquire操作。这个操作阻塞调用线程,直到AQS的状态允许这个线程继续执行;Futuretask的acquire操作为get()/get(long timeout,TimeUnit unit);2、release操作。改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞,Futuretask的release操作包括run()和cancel()方法 |
9.13、总结
本文比较详细的分析了线程池的工作流程,总体来说有如下几个内容:
- 分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
- 这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
- 介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
- 在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。
在向线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值。
11、线程池的监控
可以通过线程池提供的参数进行监控,有如下属性:
- 1、
getTaskCount
:线程池已经执行的和未执行的任务总数; - 2、
getCompletedTaskCount
:线程池在运行过程中已完成的任务数量,小于等于taskCount; - 3、
getLargestPoolSize
:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize; - 4、
getPoolSize
:线程池当前的线程数量; - 5、
getActiveCount
:当前线程池中正在执行任务的线程数量
如何使用:
- 通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展
线程池监控 Demo
- 换算关系:队列长度 + 正在执行任务的线程数量 + 已完成的任务数量 = 已经执行的和未执行的任务总数
- 例如:QueueSize:43,ActiveCount:6,CompletedTaskCount:518226,TaskCount:518275
@PreDestroy protected void terminated() { super.terminated(); if (Boolean.TRUE.equals(printLog)) { log.info("线程池:{}, 已完成的任务数量:{},曾经创建过的最大线程数量:{},当前核心线程数量:{},当前活动的线程数量:{}, 当前任务队列长度:{}", NAME, getCompletedTaskCount(), getLargestPoolSize(), getPoolSize(), getActiveCount(), getQueue().size()); } }
public class ImAsyncEventBus extends AsyncEventBus { public ImAsyncEventBus(Executor executor) { super(executor); monitorExecutor((ThreadPoolExecutor) executor); } /** * 增加线程池监听功能 */ public void monitorExecutor(ThreadPoolExecutor executor){ new Thread(()->{ while (true){ try { if(Objects.nonNull(executor)){ log.info("商品中心EventBus线程使用监控,当前线程状态,CurrentPoolSize:{},CorePoolSize:{},MaximumPoolSize:{},QueueSize:{},ActiveCount:{},CompletedTaskCount:{},TaskCount:{}," , executor.getPoolSize() , executor.getCorePoolSize() , executor.getMaximumPoolSize() , executor.getQueue().size() , executor.getActiveCount() , executor.getCompletedTaskCount() , executor.getTaskCount() ); } TimeUnit.SECONDS.sleep(5); } catch (Exception e) { log.error("商品中心EventBus线程使用监控,异常:",e); } } },"商品中心EventBus线程监控").start(); } }
12、JAVA多线程同步有哪些方法? 阿里
可以补充demo
1、使用 synchronized 关键字
- 这里讲的同步是指多个线程通过synchronized关键字这种方式来实现线程间的通信(本质上就是“共享内存”式的通信);
- 多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行
- 例如:线程B需要等待线程A执行完了 methodA() 方法之后,它才能执行 methodB() 方法。这样,线程A和线程B就实现了 通信
2、while轮询的方式
- 线程A不断地改变条件,线程ThreadB不停地通过while语句检测这个条件(list.size()==5)是否成立 ,从而实现了线程间的通信。但是这种方式会浪费CPU资源
- 轮询的条件的可见性问题:线程都是先把变量读取到本地线程栈空间,然后再去再去修改的本地变量。因此,如果线程B每次都在取本地的条件变量,那么尽管另外一个线程已经改变了轮询的条件,它也察觉不到,这样可能会造成死循环。
- 使用特殊域变量 volatile 实现线程同步
3、wait/notify机制
4、管道通信
- 就是使用
java.io.PipedInputStream
和java.io.PipedOutputStream
进行通信 - 分布式系统中说的两种通信机制:共享内存机制和消息通信机制
①中的synchronized关键字和②中的while轮询 “属于” 共享内存机制,由于是轮询的条件使用了volatile关键字修饰时,这就表示它们通过判断这个“共享的条件变量“是否改变了,来实现进程间的交流
管道通信,更像消息传递机制,也就是说:通过管道,将一个线程中的消息发送给另一个
5、Exchanger(线程间交换数据)
提供了在线程间交换数据的一种手段,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,他会一直等待第二个线程也执行此方法,当两个线程都到达同步点时,这两个线程就交换数据。
- 应用场景:
1、用于遗传算法:选两个人作为交配对象,需要交换两人的数据,并使用交叉规则得出2个交配结果
2、用于校对工作:我们需要将纸质银行流水通过人功能的方式录入成电子银行流水,为避免错误,采用AB岗录入,对两个excel数据进行校对,看是否录入一致;可以使用exchange(V x,long timeout,TimeUnit unit) //设置最大等待时长
6、使用阻塞队列实现线程同步
7、使用原子变量实现线程同步
todo 需要给出demo
参考资料
1、深入理解Java线程池:ThreadPoolExecutor
Action1:线程池有多个线程同时没取到任务,会全部回收吗?
这个题不是很好理解,举个例子:线程池核心线程数是5,当前工作线程数为6(6>5,意味着当前可以触发线程回收),如果此时有3个线程同时超时没有获取到任务,这3个线程会都被回收销毁吗。
也是非常刁钻的一题,非常细节。但是即使我们记不得源码的细节了,还是有办法去推敲出来的。
- 思路:这道题的核心点在于有多个线程同时超时获取不到任务。正常情况下,此时会触发线程回收的流程。
- 但是我们知道,正常不设置 allowCoreThreadTimeOut 变量时,线程池即使没有任务处理,也会保持核心线程数的线程。
- 如果这边3个线程被全部回收,那此时线程数就变成了3个,不符合核心线程数5个,所以这边我们可以首先得出答案:不会被全部回收。这个时候面试官肯定会问为什么?
- 根据答案不难推测,为了防止本题的这种并发回收问题的出现,线程回收的流程必然会有并发控制。没错,源码中确实又是使用 CAS 来进行并发控制,从而保证在本例中只会有一个线程成功被回收。
- 本例源码在:getTask() 方法中。
/* * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时 * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 如果减1失败,则返回重试。 * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }