常见的三种工厂类线程池
1、newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
由于队列缓冲区为空,每来一个任务时,都会在必要时新建线程执行任务直到到达Integer.MAX_VALUE,这就有可能导致大量的线程被创建,进而造成oom
2、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { //超过nThreads个线程后会无限制的往阻塞队列放入线程 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可能因为阻塞队列被撑爆造成oom
3、newSingleThreadExecutor
创建一个单线程化的线程池
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { //超过1个线程后会无限制的往阻塞队列放入线程 return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
4、newScheduledThreadPool
定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
5、newWorkStealingPool
现代线程池使用偷窃工作-每个线程都有自己的队列.当线程池线程产生任务时,它将任务排队到他自己的队列中.当线程池线程想要使任务出队时-它首先尝试将任务从他自己的队列中出队,如果没有,则从其他线程队列中"窃取"工作.这确实减少了线程池的争用并提高了性能,ForkJoinPool也利用偷窃工作(类似于mapreduce)。
newWorkStealingPool适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
线程池设计原理
线程池常用阻塞队列
1.SynchronousQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
拥有公平(FIFO)和非公平(LIFO)策略,非公平侧罗会导致一些数据永远无法被消费的情况
使用SynchronousQueue阻塞队列一般用于构造newCachedThreadPool
2.LinkedBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));
LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
3.ArrayBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));
ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。
线程池拒绝策略
1、CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
2、AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
3、DiscardPolicy:不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} 这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
4、DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队
如何合理设计线程池里的线程数
1.IO密集型 由于线程并不是一直在运行,所以可以尽可能的多配置线程
线程数 = CPU可用核心数/(1-阻塞系数)
阻塞系数= IO时间/(IO时间+CPU时间)
计算密集型任务的阻塞系数为0,
IO密集型任务的阻塞系数则接近于1。
一个完全阻塞的任务是注定要挂掉的,所以我们无须担心阻塞系数会达到1。
阻塞系数可以采用一些性能分析工具或java.lang.managenment
API来确定线程话在系统I/O操作上的时间与CPU密集任务所消耗的时间比值。
2.CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数相当的大小。
线程池内运行的线程抛异常,线程池会怎么办
当线程池中线程执行任务的时候,任务出现未被捕获的异常的情况下,线程池会将允许该任务的线程从池中移除并销毁,且同时会创建一个新的线程加入到线程池中;可以通过ThreadFactory自定义线程并捕获线程内抛出的异常,也就是说甭管我们是否去捕获和处理线程池中工作线程抛出的异常,这个线程都会从线程池中被移除。(详细分析见文章)
线程池状态
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
如何合理设置线程池队列长度
tomcat、Dubbo 等业界成熟的产品是如何设置线程队列,我们主要分析下这两个中间件
1.JDK线程池策略
corePoolSize < maximumPoolSize时
1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
2. 如果此时线程池中的数量大于等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理添加的任务。
4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
corePoolSize==maximumPoolSize时
队列未满新的任务会加入队列里,如果队列满了,那么会用空闲线程来处理新的任务。没有空闲线程则执行拒绝策略
线程池策略流程
2.Tomcat线程池策略
Tomcat的线程池队列是无限长度的,但是线程池会一直创建到maximumPoolSize,然后才把请求放入等待队列中
tomcat 任务队列org.apache.tomcat.util.threads.TaskQueue其继承与LinkedBlockingQueue,覆写offer方法。
@Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //线程个数小于MaximumPoolSize会创建新的线程。 //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); }
3.Dubbo线程池策略
Dubbo 提供3种线程池模型即:FixedThreadPool、CachedThreadPool(客户端默认的)、LimitedThreadPool(服务端默认的),从源码可以看出,其默认的队列长度都是0,当队列长度为0 ,其使用是无缓冲的队列SynchronousQueue,当运行线程超过maximumPoolSize则拒绝请求。
总结
线程池的任务队列本来起缓冲作用,但是如果设置的不合理会导致线程池无法扩容至max,这样无法发挥多线程的能力,导致一些服务响应变慢。
队列长度要看具体使用场景,取决服务端处理能力以及客户端能容忍的超时时间等
建议采用tomcat的处理方式,core与max一致,先扩容到max再放队列,不过队列长度要根据使用场景设置一个上限值,如果响应时间要求较高的系统可以设置为0。
相关文章:
https://mp.weixin.qq.com/s/2pspUsnFOXDNVfcB_tj22w