带你读《2022技术人的百宝黑皮书》——合理使用线程池以及线程变量(4)https://developer.aliyun.com/article/1340065?groupCode=taobaotech
Tomcat 自 定 义 ThreadPoolExecutor Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
/** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. * If a RejectedExecutionHandler is not specified a default one will be configured * and that one will always throw a RejectedExecutionException 6 * */ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { 9 /** *The number of tasks submitted but not yet finished. This includes tasks *in the queue and tasks that have been handed to a worker thread but the *latter did not start executing the task yet. *This number is always greater or equal to {@link #getActiveCount()}. */ // 新增的submittedCount成员变量,用于统计已提交但还未完成的任务数private final AtomicInteger submittedCount = new AtomicInteger(0); private final AtomicLong lastContextStoppedTime = new AtomicLong(0L); // 构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); // 预启动所有核心线程prestartAllCoreThreads(); }
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submit- tedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后, 再抛出RejectedExecutionException。
@Override public void execute(Runnable command) { execute(command,0,TimeUnit.MILLISECONDS); } public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; }
带你读《2022技术人的百宝黑皮书》——合理使用线程池以及线程变量(6)https://developer.aliyun.com/article/1340063?groupCode=taobaotech