本文从线程池和线程变量的原理和使用出发,结合实例给出最佳使用实践,帮助各开发人员构建出稳定、高效的java应用服务。
背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
线程池概述
▐ 什么是线程池
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
▐ 为什么要使用线程池
总体来说,线程池有如下的优势:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的使用
▐ 线程池创建&核心参数设置
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
- corePoolSize参数
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
- maximumPoolSize参数
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
- keepAliveTime参数
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
- timeUnit参数
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
- blockingQueue参数
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
- ArrayBlockingQueue :一个数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序,支持公平访问队列。
- LinkedBlockingQueue :一个由链表结构组成的可选有界阻塞队列,如果不指定大小,则使用Integer.MAX_VALUE作为队列大小,按照FIFO的原则对元素进行排序。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认情况下采用自然顺序排列,也可以指定Comparator。
- DelayQueue:一个支持延时获取元素的无界阻塞队列,创建元素时可以指定多久以后才能从队列中获取当前元素,常用于缓存系统设计与定时任务调度等。
- SynchronousQueue:一个不存储元素的阻塞队列。存入操作必须等待获取操作,反之亦然。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者。
- LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素。
- threadFactory参数
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
new ThreadFactoryBuilder().setNameFormat("general-detail-batch-%d").build()
- RejectedExecutionHandler参数
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
- ThreadPoolExecutor.AbortPolicy:默认策略,当任务队列满时抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃掉不能执行的新任务,不抛任何异常。
- ThreadPoolExecutor.CallerRunsPolicy:当任务队列满时使用调用者的线程直接执行该任务。
- ThreadPoolExecutor.DiscardOldestPolicy:当任务队列满时丢弃阻塞队列头部的任务(即最老的任务),然后添加当前任务。
▐ 线程池状态转移图
ThreadPoolExecutor线程池有如下几种状态:
- RUNNING:运行状态,接受新任务,持续处理任务队列里的任务;
- SHUTDOWN:不再接受新任务,但要处理任务队列里的任务;
- STOP:不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务;
- TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程,当线程池执行terminated()方法时进入TIDYING状态;
- TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕,terminated()方法执行完成;
▐ 线程池任务调度机制
线程池提交一个任务时任务调度的主要步骤如下:
- 当线程池里存活的核心线程数小于corePoolSize核心线程数参数的值时,线程池会创建一个核心线程去处理提交的任务;
- 如果线程池核心线程数已满,即线程数已经等于corePoolSize,新提交的任务会被尝试放进任务队列workQueue中等待执行;
- 当线程池里面存活的线程数已经等于corePoolSize了,且任务队列workQueue已满,再判断当前线程数是否已达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务;
- 如果当前的线程数已达到了maximumPoolSize,还有新的任务提交过来时,执行拒绝策略进行处理。
核心代码如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
▐ Tomcat线程池分析
- Tomcat请求处理过程
Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
- 使用 ProtocolHandler 接口来封装I/O模型和应用层协议的差异,其中I/O模型可以选择非阻塞I/O、异步I/O或APR,应用层协议可以选择HTTP、HTTPS或AJP。ProtocolHandler将I/O模型和应用层协议进行组合,让EndPoint只负责字节流的收发,Processor负责将字节流解析为Tomcat Request/Response对象,实现功能模块的高内聚和低耦合,ProtocolHandler接口继承关系如下图示。
- 通过适配器 Adapter 将Tomcat Request对象转换为标准的ServletRequest对象。
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
- Tomcat线程池创建
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
- Tomcat自定义ThreadPoolExecutor
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
/** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. * If a RejectedExecutionHandler is not specified a default one will be configured * and that one will always throw a RejectedExecutionException * */ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { /** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */ // 新增的submittedCount成员变量,用于统计已提交但还未完成的任务数 private final AtomicInteger submittedCount = new AtomicInteger(0); private final AtomicLong lastContextStoppedTime = new AtomicLong(0L); // 构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); // 预启动所有核心线程 prestartAllCoreThreads(); } }
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
@Override public void execute(Runnable command) { execute(command,0,TimeUnit.MILLISECONDS); } public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
- Tomcat自定义任务队列
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
/** * As task queue specifically designed to run with a thread pool executor. The * task queue is optimised to properly utilize threads within a thread pool * executor. If you use a normal queue, the executor will spawn threads when * there are idle threads and you wont be able to force items onto the queue * itself. */ public class TaskQueue extends LinkedBlockingQueue<Runnable> { public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected } @Override public boolean offer(Runnable o) { // 1. parent为线程池,Tomcat中为自定义线程池实例 //we can't do any checks if (parent==null) return super.offer(o); // 2. 当线程数达到最大线程数时,新提交任务入队 //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); // 3. 当提交的任务数小于线程池中已有的线程数时,即有空闲线程,任务入队即可 //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); // 4. 【关键点】如果当前线程数量未达到最大线程数,直接返回false,让线程池创建新线程 //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; // 5. 最后的兜底,放入队列 //if we reached here, we need to add it to the queue return super.offer(o); } }
- Tomcat自定义任务线程
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
/** * A Thread implementation that records the time at which it was created. * */ public class TaskThread extends Thread { private final long creationTime; public TaskThread(ThreadGroup group, Runnable target, String name) { super(group, new WrappingRunnable(target), name); this.creationTime = System.currentTimeMillis(); } /** * Wraps a {@link Runnable} to swallow any {@link StopPooledThreadException} * instead of letting it go and potentially trigger a break in a debugger. */ private static class WrappingRunnable implements Runnable { private Runnable wrappedRunnable; WrappingRunnable(Runnable wrappedRunnable) { this.wrappedRunnable = wrappedRunnable; } @Override public void run() { try { wrappedRunnable.run(); } catch(StopPooledThreadException exc) { //expected : we just swallow the exception to avoid disturbing //debuggers like eclipse's log.debug("Thread exiting on purpose", exc); } } } }
- 思考&小结
- Tomcat为什么要自定义线程池和任务队列实现?
JUC原生线程池在提交任务时,当工作线程数达到核心线程数后,继续提交任务会尝试将任务放入阻塞队列中,只有当前运行线程数未达到最大设定值且在任务队列任务满后,才会继续创建新的工作线程来处理任务,因此JUC原生线程池无法满足Tomcat快速响应的诉求。 - Tomcat为什么使用无界队列?
Tomcat在EndPoint中通过acceptCount和maxConnections两个参数来避免过多请求积压。其中maxConnections为Tomcat在任意时刻接收和处理的最大连接数,当Tomcat接收的连接数达到maxConnections时,Acceptor不会读取accept队列中的连接;这时accept队列中的线程会一直阻塞着,直到Tomcat接收的连接数小于maxConnections(maxConnections默认为10000,如果设置为-1,则连接数不受限制)。acceptCount为accept队列的长度,当accept队列中连接的个数达到acceptCount时,即队列满,此时进来的请求一律被拒绝,默认值是100(基于Tomcat 8.5.43版本)。因此,通过acceptCount和maxConnections两个参数作用后,Tomcat默认的无界任务队列通常不会造成OOM。
/** * Allows the server developer to specify the acceptCount (backlog) that * should be used for server sockets. By default, this value * is 100. */ private int acceptCount = 100; private int maxConnections = 10000;








