本文主要介绍Java线程池的原理,涉及源码方面的分析,并最终实现动态设置线程池中主要参数的案例。
线程池的是利用池化思想设置的管理多线程的工具,其主要的优势是:(1)减少创建线程和销毁线程的资源开销;(2)多个线程并行处理能有效提升多任务处理的效率;(3)可以自定义线程池中的参数,有良好的可拓展性。本文重点针对动态设置线程池中参数进行说明和演示。
一、线程池原理
Java中线程池的继承关系,其中Executor是顶层接口,规定了线程池的最基本的execute()方法,其继承的ExecutorService拓展了对线程池的操作的方法,其主要的实现类是ThreadPoolExecutor,也是常见的创建线程池的实现类。
1.线程池参数含义
以下说明线程池实现类ThreadPoolExecutor中参数的含义:
int corePoolSize,//核心线程池的大小 int maximumPoolSize,//最大线程池的大小 long keepAliveTime,//存活时间 TimeUnit unit,//时间单位 BlockingQueue<Runnable> workQueue,//工作队列 ThreadFactory threadFactory,//线程产生工厂 RejectedExecutionHandler handler)//拒绝策略
设置线程池参数的策略
主要需要考虑的是设置corePoolSize、maximumPoolSize、workQueue参数,业界常规的设置策略:
CPU密集型:corePoolSize = CPU核数 + 1
IO密集型:corePoolSize = CPU核数 * 2
2.线程池任务调度逻辑
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其流程图如下:
其中ThreadPoolExecutor#execute()方法的处理逻辑:
public void execute(Runnable command) { //1、判断是否传进来线程 if (command == null) throw new NullPointerException(); int c = ctl.get(); //2、判断工作线程池是否满了 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //3、判断工作队列是否满了 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //4、以上条件都不符合,直接拒绝 else if (!addWorker(command, false)) reject(command); }
二、动态设置线程池参数
在日常实践过程中,一次性就能确定线程池的参数其实是比较困难,所以就需要能够动态设置线程池的参数。以下是几种常见的动态设置线程池参数的方法。
public void setCorePoolSize(int corePoolSize); //设置核心线程数 public void setMaximumPoolSize(int maximumPoolSize); //设置最大线程数 public void setKeepAliveTime(long time, TimeUnit unit); //设置空闲存活时间 public void setThreadFactory(ThreadFactory threadFactory); //设置线程工厂 public void setRejectedExecutionHandler(RejectedExecutionHandler handler); //设置拒绝策略
动态设置线程池参数方法
调整核心线程数和最大线程数的
executor.setCorePoolSize(corePoolSize + size); executor.setMaximumPoolSize(maxPoolSize + size);
调整线程池阻塞队列大小
原始LinkedBlockingQueue的capacity是final类型,无法被修改。
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity;
可以通过重写LinkedBlockingQueue成ResizeableCapacityLinkedBlockingQueue,将capacity的final修饰符去掉,实现setter/getter方法,这样就能正常修改了。
private int capacity; public int getCapacity() { return capacity; } public void setCapacity(int capacity) { this.capacity = capacity; }
1.具体实践
之前工作中碰到一个场景,就是正常情况下使用默认的线程池参数执行多线程任务可以正常执行,但在某些特殊情况下发的任务执行时间远远超过任务超时执行时间,所以当执行这类任务的时候就会报错。针对这种情况,当时考虑使用动态设置线程池的方案,即在正常情况下使用默认线程池执行,在运行特殊任务的时候将默认线程池进行对应任务数的弹性扩容,这样就不会影响到正常的任务执行,但该类任务执行完成后,再将线程数设置为默认值。
void noVmExeperiment(int size) { int corePoolSize = executor.getCorePoolSize(); executor.setCorePoolSize(corePoolSize + size); //执行该类任务时动态扩容核心线程数 for (int i = 0; i < size; i++) { executor.execute(new Runnable() { @Override public void run() { System.out.println(">>>>>>>>" + Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } executor.setCorePoolSize(corePoolSize); //执行完成后核心线程数回退到默认值 }
2.常见问题
当设置的corePoolSize大于原maximumPoolSize,其最大的工作线程不会达到corePoolSize,只会是maximumPoolSize。比如原线程corePoolSize是3,maximumPoolSize是10,经过动态调整后corePoolSize为20,其实际最大工作线程也只会是10。
究其原因这是由于hreadPoolExecutor#getTask()方法中执行的逻辑是:创建新的工作线程会将工作线程+1,当但工作线程超过maximumPoolSize时会将工作线程-1,这样一来工作线程值停留在maximumPoolSize。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //当工作线程数量超过最大线程数时,减少工作线程数 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
TODO
- 还需要再详细说明线程池的源码;
参考资料
- Java线程池实现原理及其在美团业务中的实践:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
- 动态线程池的简单实现思路:https://juejin.cn/post/7240065163486216253
- 线程池监控和动态配置:https://juejin.cn/post/7104814252510150692
- Java并发编程学习篇8_基于开源的配置中心的轻量动态线程池dynamic-tp实践与源码原理分析:https://blog.csdn.net/qq_24654501/article/details/125503922?spm=1001.2014.3001.5501
- 线程池中各个参数如何合理设置:https://blog.csdn.net/riemann_/article/details/104704197
- 填个坑!再谈线程池动态调整那点事。:https://segmentfault.com/a/1190000040858637 (动态调整线程池参数)
- 如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。:https://mp.weixin.qq.com/s/YbyC3qQfUm4B_QQ03GFiNw