Tomcat的线程池Executor除了实现Lifecycle接口外,基本和JDK的ThreadPoolExecutor一致,以前是直接继承了JDK的ThreadPoolExecutor,并改写部分逻辑,在最新的代码上(Tomcat 10,2021.7.22以后),甚至是直接抄了一份,改写部分逻辑,然后再通过组合的方式使用。
主要区别是线程工厂、任务队列和拒绝策略上,先看看JDK线程池的执行策略,以及在这几个方面有什么缺陷。
JDK的线程池执行流程
首先需要知道线程池的几个基本概念:核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、阻塞任务队列、拒绝策略。JDK的线程池执行流程如下,当有新任务来临时:
文章内容收录到个人网站,方便阅读:hardyfish.top/
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
原生线程池在达到核心线程数时,是优先添加队列,这样比较适合CPU密集型任务(认为新建线程不如让任务排队)。
Tomcat面临的问题
但是Tomcat是属于IO密集型任务,在Tomcat看来,原生的线程池主要有两个问题:
1. 阻塞任务队列
JDK可选的几个阻塞任务队列无非是:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、LinkedTransferQueue。后两个显然不适合,候选只有前两个:链表和数组。
对于原生LinkedBlockingQueue,无界,那么线程池就不会满足上述第4步的条件,线程会在达到corePoolSize后不再新建,而是一直加入队列。而作为IO密集型的Tomcat,显然是希望此时创建新线程。
对于原生ArrayBlockingQueue,有界,但是线程池在第5步的时候,就会执行拒绝策略。
2. 拒绝策略
当满足第5步的时候,原生线程池就会执行拒绝策略,具体来说是j.u.c.RejectedExecutionHandler接口。JDK默认了四个实现:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
- 抛出异常(默认)
- 直接丢弃
- 丢弃最早的任务
- 交给调用者执行
Tomcat的线程池
因为Tomcat的任务属于IO密集型,大概率不会长时间占用CPU资源。即期望任务堆积时,优先创建线程来处理,而不是入队,但是又不想任务被丢弃或交给调用者处理(想始终交给线程池处理)。
所以做了如下改造:
- 阻塞任务队列继承了LinkedBlockingQueue(无界),但是自身持有ThreadPoolExecutor的引用,又改写了插入方法:
public class TaskQueue extends LinkedBlockingQueue<Runnable> { private transient volatile ThreadPoolExecutor parent = null; @Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) { return super.offer(o); } //we are maxed out on threads, simply queue the object // 核心线程数=最大线程数,无脑添加 if (parent.getPoolSize() == parent.getMaximumPoolSize()) { return super.offer(o); } //we have idle threads, just add it to the queue // 有空闲线程(即核心线程有空闲的),则添加,核心线程自己会去取任务执行 if (parent.getSubmittedCount()<=(parent.getPoolSize())) { return super.offer(o); } //if we have less threads than maximum force creation of a new thread // 核心线程数小于最大线程数,这里改写了JDK线程池的第4步逻辑,不是等队列满了再新建线程,而是优先新建线程 if (parent.getPoolSize()<parent.getMaximumPoolSize()) { return false; } //if we reached here, we need to add it to the queue // 其他情况,无脑添加 return super.offer(o); } }
- 当活跃线程数达到最大线程数,即不能再创建新线程时,将执行拒绝策略。这里对应原生JDK的第5步:
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
但此时队列其实是没满的,只是满足了上述最后一个if (parent.getPoolSize()
Tomcat的处理也是抛出异常,但是在异常处理时,又强制将任务插入队列:
@Override public void execute(Runnable command) { submittedCount.incrementAndGet(); try { executeInternal(command); } catch (RejectedExecutionException rx) { if (getQueue() instanceof TaskQueue) { // If the Executor is close to maximum pool size, concurrent // calls to execute() may result (due to Tomcat's use of // TaskQueue) in some tasks being rejected rather than queued. // If this happens, add them to the queue. final TaskQueue queue = (TaskQueue) getQueue(); if (!queue.force(command)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } else { submittedCount.decrementAndGet(); throw rx; } } }
所谓强制插入force其实就是直接super.offer而已。
总结下来就是Tomcat的线程池总是优先尝试新建线程,如果达到上限了,再尝试将任务放入阻塞队列。由于是IO密集型任务,执行时间一般都不会太长,所以阻塞队列大概率不会排队太多造成OOM。
此外Tomcat还自定义了线程工厂,这个比较简单,只是在新建线程时,将调用工厂的类加载器传递给线程上下文加载器:
@Override public Thread newThread(Runnable r) { TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(threadPriority); // Set the context class loader of newly created threads to be the // class loader that loaded this factory. This avoids retaining // references to web application class loaders and similar. t.setContextClassLoader(getClass().getClassLoader()); return t; }
测试验证
写个简单的程序测试下:线程池固定核心线程数1,最大线程数5,一次提交10个任务。
JDK线程池
public class ThreadPoolTest { public static void main(String[] args) { // 构造线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {} }); } System.out.println("active: " + executor.getActiveCount()); System.out.println("queue: " + executor.getQueue().size()); } }
输出:
active: 1
queue: 9
在队列可用的前提下,JDK线程池优先让任务排队。
Tomcat线程池
public class ThreadPoolTest { public static void main(String[] args) { // 阻塞队列使用Tomcat的TaskQueue TaskQueue queue = new TaskQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS, queue); // 需要设置parent引用 queue.setParent(executor); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {} }); } System.out.println("active: " + executor.getActiveCount()); System.out.println("queue: " + executor.getQueue().size()); } }
输出:
active: 5
queue: 5
在线程数未达到最大线程数时,Tomcat线程池优先创建线程执行任务。
总结
简单来说:原生JDK的线程池是优先将任务添加到阻塞队列,等队列满再尝试创建线程,适合IO密集型任务。Tomcat属于IO密集型,所以总是优先尝试新建线程,线程池满载了,再添加任务到阻塞队列里排队等待。
Dubbo中也有极其相似的处理:EagerThreadPoolExecutor。
不过个人感觉这种处理方式不太好,在catch块中执行逻辑总觉得不太合适,感觉单独写个reject接口实现来处理比较好,类似如下:
private static class ForceAdd implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { BlockingQueue<Runnable> queue = executor.getQueue(); if (queue instanceof TaskQueue) { final TaskQueue q = (TaskQueue) queue; if (!q.force(r)) { executor.submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } else { executor.submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } }
不过Tomcat和Dubbo都采用在catch块中处理,暂未找到这样做的相关原因描述。