一为什么离不开线程池?
多线程开发是提高程序性能的一种方式,但线程的创建与销毁,以及运行线程上下文切换都是需要消耗cpu资源的,相对来说任务的执行所占整个线程运行的cpu时间越短,线程的运行效率也相应越低。而在有些系统中,我们需要反复频繁地创建线程,例如tomcat,每个http的处理handle都必须运行在一个线程中,这样在并访问量很大的情况下,就会造成系统中创建了很多系统线程,使得cpu频繁的进行线程上下文切换,从而导致了整个系统的行能低下。为了解决这样的问题,编程领域设计了线程池来解决线程切换带来的性能损耗。
线程池的设计思想是创建一定数量的运行线程,将要执行的任务,放入线程池,线程池会自动分配线程去执行任务,执行完任务的线程又会被放入池中,等待新任务的到来,而不是退出线程,从而实现了线程的重复利用,避免了系统反复创建销毁线程,造成的性能损耗。另一方面,线程池将程序员的关注点由线程转向了任务,对于使用者来说,线程池就像一个盒子,使用者无需关心线程操作相关的实现细节,可以将更多的精力放在任务本身上,只需在合适的时机将任务丢给线程池即可。线程池将任务与线程进行解绑,更有利于将程序解耦。线程与线程池的编程模型如下图所示:
二线程池怎么玩?
首先线程池的使用需要通过ThreadPoolExecutor的构造函数来创建一个线程池:
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
构造函数参数意义如下:
参数 | 意义 |
corePoolSize | 线程池核心线程数量 |
maximumPoolSize | 线程池最大线程数量 |
keepAliveTime | 线程保持时间,空闲线程可以存活时间 |
TimeUnit | 线程保持时间的单位(keepAliveTime的单位) |
workQueue | 任务队列 |
threadFactory | 线程创建工厂 |
RejectedExecutionHandler | 线程数超过最大线程数后,任务将被拒绝并回调的handler |
在我们创建了一个线程池后,便可以向线程池中提交一个Runnable类型的任务了:
threadPool.execute( new Runnable(){ public void run(){ ...//任务代码 } } )
这样我们就将任务提交到了线程池去运行了,至于线程池如何实现任务运行,就不是我们需要考虑的事情了,从而将任务与线程进行了解耦。但是我们也无法得知任务是否执行成功,如果我们需要得知任务的执行结果,则需要使用ThreadPoolExecutor.submit(Runnable task)方法来向线程池提交任务,该方法会返回一个Futrue类型的结果,通过以下代码便可以判断任务是否执行成功了。
Future<Object> threadFuture = threadPoolExecutor.submit(task); try{ Object resualt = threadFuture.get(); }catch (InterruptedException e){ // 处理线程中断异常 }catch (ExecutionException e){ // 处理无法执行异常 } finally { threadPoolExecutor.shutdown(); }
三ThreadPoolExecutor的执行流程
上一节我们简单描述了线程池的使用方式,这里我们来探究一下ThreadPoolExecutor的执行流程,其流程如下:
- 创建线程池,等待任务执行。
- 当任务提交给线程池后,会判断核心线程池是否已满,即当前线程数与corePoolSize进行比较,如果核心线程池未满,则创建新线程来执行任务,如果核心线程池已满则将任务加入任务队列BlockingQueue中,等待执行。
- 如果任务队列也满了,则ThreadPoolExecutor会继续创建新的线程来处理任务,但是线程池中线程数目不得超过最大线程数maximumPoolSize,否则线程池将会采取饱和策略,拒绝处理任务,并将调用用户设置的RejectedExecutionHandler策略函数进行处理。这里需要注意,只有BlockingQueue为有界队列时,maximumPoolSize参数才会有作用,否者无界BlockingQueue不可能满,不会触发线程池来处理任务队列已满的情况,无界队列使用不当可能造成线程池无休止创建线程的现象。
- 线程池中的线程处理完当前任务后,会从任务队列中尝试取任务,如果取到任务,则执行任务,否则等待keepAliveTime时间,如果在keepAliveTime内都没有取到任务,则该线程会退出。
线程池执行流程图如下:
execute的实现源码如下(JDK8):
public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //step 1: 核心线程数判断 if (addWorker(command, true)) //step 1.1 添加核心线程执行任务 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //step 2尝试任务加入队列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //step 2.1判断线程池是否运行 reject(command); else if (workerCountOf(recheck) == 0) //step 2.2判断当前工作线程数量如果等于0,直接添加工作线程 addWorker(null, false); } else if (!addWorker(command, false)) //step 3 任务无法队列,尝试创建线程执行任务 reject(command); }
读者结合笔者的注释,应该不难理解这段源码。这里我们需要注意一下线程池的控制变量ctl,该变量是一个AtomicInteger类型的原子变量,这个变量在这个线程池的工作中至关重要,该变量控制了线程池的两个属性:线程的数目和线程池的当前运行状态(线程池拥有的状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED)。这个变量设计的非常巧妙,一方面减少了线程池的变量数量,更重要的一方面是,该变量是原子类型变量,线程池的实现函数中,往往需要同时获取这两个属性,如果将两个属性放入一个原子变量中,根据Atomic类支持线程的重入,线程池也就只需获取一把锁,便可以控制线程池的两个属性,这里实际上变相减少了一把锁的使用,非常巧妙,Doug Lea不愧被称为Java并发大师!下面源码展示线程池通过ctl变量的位运算获取线程属性的操作(JDK8):
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static int workerCountOf(int c) { return c & CAPACITY; } private static int runStateOf(int c) { return c & ~CAPACITY; }
这里我们可以从ThreadPoolExecutor的执行流程中看到,线程池并不是一开始就创建好coolPoolSize个线程,而是随着任务的添加,来逐步添加工作线程的。当然线程池也提供了线程池的预热功能prestartAllThreads(),该方法线程池会通过addWorker(null, true)函数来创建coolPoolSize个核心线程来等待任务的到来,addWorker()方法的分析见下节。
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
四Worker工作线程
在ThreadPoolExecutor.execute()方法中调用了addworker()方法,其中方法addworker(Runnable firstTask, boolean core)的第一个参表示该工作线程创建后第一个执行的任务,该参数为null时,表示线程池只是创建了一个等待任务的工作线程;第二参数表示添加的线程是否是核心线程,用于区分线程池使用coolPoolSize还是maximumPoolSize进行线程池线程数目的控制。在addworker()方法中创建了一个Worker对象,一个Worker对象就是ThreadPoolExecutor中的一个线程。当一个任务提交时,Worker对象就会使用线程工厂创建一个线程,并将该线程与当前firstTask绑定,Worker对象就像线程池工厂中的劳工一样,会不停的获取新的任务来执行。新创建的Worker线程都会保存在线程池的HashSet<Worker>成员变量中,这里我们来看一下工作线程的运行核心函数Worker.run()的实现(JDK8,部分代码省略):
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); //默认线程工厂会调用创建一个线程,并与firstTask绑定 } public void run() { runWorker(this); } }
ThreadPoolExecutor的默认线程工厂newThread(Runnable)的实现如下,这里便将Worker与实际线程绑定了,并使用firstTask创建了线程:
static class DefaultThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
而Worker.run()被调用后,Worker对应的线程会调用ThreadPoolExecute.runWorker()来执行firstTask任务,并循环从任务队列中取任务:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } } finally { processWorkerExit(w, completedAbruptly); } }
那么问题来了,runWorker()方法会在worker工作线程没有取到任务时,退出循环,此时工作线程便会退出,那keepAliveTime参数是如何控制工作线程去任务的存活时间的?
奥秘就在取任务getTask()的实现中,Worker.getTask()实现如下(JDK 8):
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
这里就清晰了,原来Worker工作线程会尝试在keepAliveTime时间内从workQueue队列中取任务,线程的超时控制依赖于队列取元素的超时控制,也就是说在keepAliveTime时间类,工作线程会阻塞在getTask()方法上,直到线程取到任务或者取任务超时。
Worker的时序图如下:
五线程池的饱和策略
饱和策略是线程池应对任务队列和线程池饱和时所采取的策略,ThreadPoolExecutor提供了setRejectedExecutionHandler()方法设置自定义饱和策略的接口,如果没有设置该接口,Java便会采取默认饱和策略AbortPolicy才处理,JDK提供了4中饱和策略:
- AbortPolicy : 默认饱和策略,直接抛出异常。
- CallerRunsPolicy : 使用调用者线程来执行任务。
- DiscardOldestPolicy : 丢弃队列中最近一个任务,并执行当前任务。
- DiscardPolicy : 不处理,直接丢弃当前任务。
这四种JDK提供的饱和策略都实现了RejectedExecutionHandler接口,并且只有AbortPolicy策略才会抛出RejectedExecutionException异常,如果实际开发环境中需要实现自定义饱和策略,可以参考以上四种饱和策略的实现方式。
六线程池的关闭
做人做事要善始善终,软件开发也一样,占用了的资源要记得释放,使用了的线程要记得归还,有借有还,再借不难。线程池不适合处理需要长期运行的任务,长任务应该开辟专用线程进行处理。线程池提供了shutdown()和shutdownNow()两种方式来主动关闭线程池,虽然两者都可以关闭线程池,但是还是有一定区别的:
- shutdown():当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
- shutdownNow():线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep/wait/Condition/定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
在调用shutdown()时,shutdown()只会将空闲线程进行关闭,而shutdownNow()方法会尝试关闭所有线程,因此如果任务是否正常执行完,对于系统没有影响,可以使用shutdownNow()方法,一般开发中都会使用shutdown()来优雅的关闭线程池。
七线程池的配置原则
线程池提供了统一管理线程的机制,但是线程池的运行效率的高低,一方面也需要程序员自己进行调优把控。在HotSpot虚拟机中Java线程的创建使用了底层操作系统的线程创建接口来系统线程,并不是伪线程(这里说句题外话,有同学和我说python使用的是伪线程,其实自己写个多线程小程序就可以判断出是python虚拟机采用的是真实线程还是伪线程。学习软件开发,门槛需要自己迈,坑需要自己踩,多动手,不可懒)。我们知道线程是CPU执行的基本单位,单个处理器同一时间内只能运行一个线程,因此线程池的大小的配置,也应该与CPU的核心数目相关(通过Runtime.getRuntime().availableProcessors()方法可以获取到当前系统处理器数目),过多的创建线程并不一定能带来系统总体性能的提升,反而会使处理器性能浪费在频繁的线程切换中。线程数目与效率的关系图如下:
那么线程池到底应该配置多大,才能高效的利用线程池?这里没有固定的答案,这里需要根据任务类型来进行配置。如果任务是CPU密集型任务,那么线程池应该配置较小,例如线程池可以配置CPU核心数目相等的大小;如果是需要资源等待类型的任务(如I/O等访问,数据库操作等),则应该根据等待的平均时间,来配置N倍于CPU核心数目的大小。线程池数目配置的具体的大小,还需要在实际开发工作中,编写行能测试类,结合虚拟机行能监控工具(如VisualVM),来进行配置调优。
说明:
线程池提交的是一个Runnable类型的任务,因此线程池变量共享的问题,也就是多线程变量共享的问题。在多线程环境下,变量当然是可以共享的,例如售票系统中的票数限制,订单系统中的订单号等,都需对同一变量进行操作。为了控制篇幅,多线程共享问题在下一篇分析。