前提
Tomcat 10.1.x
Tomcat线程池介绍
Tomcat线程池,源于JAVA JDK自带线程池。由于JAVA JDK线程池策略,比较适合处理 CPU 密集型任务,但是对于 I/O 密集型任务,如数据库查询,rpc 请求调用等,不是很友好,所以Tomcat在其基础上进行了扩展。
任务处理流程
扩展线程池相关源码简析
Tomcat中定义了一个StandardThreadExecutor
类,该类实现了org.apache.catalina.Executor
,org.apache.tomcat.util.threads.ResizableExecutor
接口
该类内部定义了namePrefix
(创建的线程名称前缀,默认值tomcat-exec-
),maxThreads
(最大线程数,默认值 200),minSpareThreads
(最小线程数,即核心线程数,默认值 25),maxIdleTime
(线程最大空闲时间,毫秒为单位,默认值60秒),maxQueueSize
(最大队列大小,默认值 Integer.MAX_VALUE
)等属性,此外,还定义了一个org.apache.tomcat.util.threads.ThreadPoolExecutor
类型的执行器对象,一个execute(Runnable command)
方法
当execute(Runnable command)
方法被调用时,会调用上述ThreadPoolExecutor
类对象的execute
方法
org.apache.catalina.core.StandardThreadExecutor.java
import org.apache.catalina.Executor; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleState; import org.apache.catalina.util.LifecycleMBeanBase; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.threads.ResizableExecutor; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; import org.apache.tomcat.util.threads.ThreadPoolExecutor; public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor, ResizableExecutor { protected static final StringManager sm = StringManager.getManager(StandardThreadExecutor.class); // ---------------------------------------------- Properties /** * Default thread priority */ protected int threadPriority = Thread.NORM_PRIORITY; /** * Run threads in daemon or non-daemon state */ protected boolean daemon = true; /** * Default name prefix for the thread name */ protected String namePrefix = "tomcat-exec-"; /** * max number of threads */ protected int maxThreads = 200; /** * min number of threads */ protected int minSpareThreads = 25; /** * idle time in milliseconds */ protected int maxIdleTime = 60000; /** * The executor we use for this component */ protected ThreadPoolExecutor executor = null; /** * the name of this thread pool */ protected String name; /** * The maximum number of elements that can queue up before we reject them */ protected int maxQueueSize = Integer.MAX_VALUE; /** * After a context is stopped, threads in the pool are renewed. To avoid * renewing all threads at the same time, this delay is observed between 2 * threads being renewed. */ protected long threadRenewalDelay = org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY; private TaskQueue taskqueue = null; // ---------------------------------------------- Constructors public StandardThreadExecutor() { //empty constructor for the digester } //....此处代码已省略 @Override public void execute(Runnable command) { if (executor != null) { // Note any RejectedExecutionException due to the use of TaskQueue // will be handled by the o.a.t.u.threads.ThreadPoolExecutor executor.execute(command); } else { throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); } } //....此处代码已省略 }
当org.apache.tomcat.util.threads.ThreadPoolExecuto
类对象的execute(Runnable command)
方法被调用时,会调用该类定义的一个executeInternal
方法,并在捕获到RejectedExecutionException
异常时,尝试再次将任务放入工作队列中。
executeInternal
方法中,通过代码可知,当前线程数小于核心线程池大小时,会创建新线程,否则,会调用workQueue
对象(org.apache.tomcat.util.threads.TaskQueue
类型)的offer
方法,将任务进行排队。Tomcat通过控制workQueue.offer()
方法的返回值,实现了当前线程数超过核心线程池大小时,优先创建线程,而不是让任务排队。
org.apache.tomcat.util.threads.ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService { //...此处代码已省略 @Override public void execute(Runnable command) { submittedCount.incrementAndGet(); try { executeInternal(command); } catch (RejectedExecutionException rx) { if (getQueue() instanceof TaskQueue) { // If the Executor is close to maximum pool size, concurrent // calls to execute() may result (due to Tomcat's use of // TaskQueue) in some tasks being rejected rather than queued. // If this happens, add them to the queue. final TaskQueue queue = (TaskQueue) getQueue(); if (!queue.force(command)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } else { submittedCount.decrementAndGet(); throw rx; } } } /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@link RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ private void executeInternal(Runnable command) { if (command == null) { throw new NullPointerException(); } /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 当前线程数小于核心线程数时, if (addWorker(command, true)) { // 创建线程 return; } c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //workQueue.offer(command)为false时,会走以下的else if分支,创建线程 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { reject(command); } } //...此处代码已省略 }
org.apache.tomcat.util.threads.TaskQueue
继承于java.util.concurrent.LinkedBlockingQueue
,并重写了offer
(排队任务的方法),该方法中,当当前线程数大于核心线程数,小于最大线程数时,返回false
,导致上述executeInternal
方法中workQueue.offer(command)
为false
,进而导致该分支代码不被执行,执行addWorker(command, false)
方法,创建新线程。
org.apache.tomcat.util.threads.TaskQueue
import java.util.Collection; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.tomcat.util.res.StringManager; /** * As task queue specifically designed to run with a thread pool executor. The * task queue is optimised to properly utilize threads within a thread pool * executor. If you use a normal queue, the executor will spawn threads when * there are idle threads and you won't be able to force items onto the queue * itself. */ public class TaskQueue extends LinkedBlockingQueue<Runnable> { //...此处代码已省略 /** * Used to add a task to the queue if the task has been rejected by the Executor. * * @param o The task to add to the queue * * @return {@code true} if the task was added to the queue, * otherwise {@code false} */ public boolean force(Runnable o) { if (parent == null || parent.isShutdown()) { throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); } return super.offer(o); //forces the item onto the queue, to be used if the task is rejected } @Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) { return super.offer(o); } //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) { return super.offer(o); } //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<=(parent.getPoolSize())) { return super.offer(o); } //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) { return false; } //if we reached here, we need to add it to the queue return super.offer(o); } //...此处代码已省略 }