public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
ThreadPoolExecutor 的继承关系如下
Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor
其中有一个构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } 发现我们可以指定workQueue和handler。当然还有其余的构造函数,有类似的效果
线程池构造函数7大参数解释:
corePoolSize:核心线程数。
maximumPoolSize:最大线程数。表明线程中最多能够创建的线程数量。
keepAliveTime:空闲的线程保留的时间。
unit:空闲线程的保留时间单位。
BlockingQueue workQueue:用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
1、ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
3、SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
4、PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等
handler:饱和策略处理器。默认提供的4中策略上面已经有解释了
强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
但是,但是,但是。。。用fix是有坑的。详情请见我这篇博文(在生产环境的一个活生生的血案):
另外,此处我说一下ThreadPoolTaskExecutor:
ThreadPoolTaskExecutor是一个spring的线程池技术,其实,它的实现方式完全是使用ThreadPoolExecutor进行实现(有点类似于装饰者模式。当然Spring提供的功能更加强大些,因为还有定时调度功能)。
三、如何设置线程池的参数:
系统默认值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
那我们如何来设置呢?需要根据几个值来决定
tasks :每秒的任务数,假设为500~1000
taskcost:每个任务花费时间,假设为0.1s
responsetime:系统允许容忍的最大响应时间,假设为1s
做几个计算
corePoolSize = 每秒需要多少个线程处理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
计算可得 queueCapacity = 80/1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
计算可得 maxPoolSize = (1000-80)/10 = 92
(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。
JDK1.5 的线程池由 Executor 框架提供。 Executor 框架将处理请求任务的提交和它的执行解耦。可以制定执行策略。在线程池中执行线程可以重用已经存在的线程,而不是创建新的线程,可以在处理多请求时抵消线程创建、消亡产生的开销。如果线程池过大,会导致内存的高使用量,还可能耗尽资源。如果过小,会由于存在很多的处理器资源未工作,对吞吐量造成损失。
如何合理配置线程池大小,一般需要根据任务的类型来配置线程池大小:
1、如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1(比如是4核心 就配置为5)
2、如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
3、使用场景
1、当你的任务是非必要的时候。比如记录操作日志、通知第三方服务非必要信息等,可以使用线程池处理非阻塞任务
2、当你的任务非常耗时时候,可以采用线程池技术
3、当请求并发很高时,可以采用线程池技术优化处理
可以通过Executors静态工厂构建线程池,但一般不建议这样使用。
提醒:能够用线程池的时候,不要自己的去new线程start,在高并发环境下,系统资源是宝贵的,需要节约资源才能提高可用性。
小彩蛋
Executors工具类给我们提供了不少快捷创建线程池的方法,虽然我们不推荐使用。但是里面有个方法我觉得还是不错的:Executors.newSingleThreadExecutor() 如果把这个当作全局线程池,可以很好实现异步,并且还能保证任务的顺序执行,进而达到消峰的效果:
private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); private static AtomicInteger num = new AtomicInteger(); public static void main(String[] args) { for (int i = 0; i < 20; i++) { executorService.execute(() -> { num.getAndIncrement(); System.out.println(Thread.currentThread().getName() + "-->>>>" + num); }); } executorService.shutdown(); } 输出: pool-1-thread-1-->>>>1 pool-1-thread-1-->>>>2 pool-1-thread-1-->>>>3 pool-1-thread-1-->>>>4 pool-1-thread-1-->>>>5 pool-1-thread-1-->>>>6 pool-1-thread-1-->>>>7 pool-1-thread-1-->>>>8 pool-1-thread-1-->>>>9 pool-1-thread-1-->>>>10 pool-1-thread-1-->>>>11 pool-1-thread-1-->>>>12 pool-1-thread-1-->>>>13 pool-1-thread-1-->>>>14 pool-1-thread-1-->>>>15 pool-1-thread-1-->>>>16 pool-1-thread-1-->>>>17 pool-1-thread-1-->>>>18 pool-1-thread-1-->>>>19 pool-1-thread-1-->>>>20
我们发现pool只有一个,且thread也只有一个,而且任务都是顺序执行的。当我们用Fiexed的话,也是能够达到顺序执行的效果的。因为内部的阻塞队列是FIFO的实现:
private static final ExecutorService executorService = Executors.newFixedThreadPool(5); private static AtomicInteger num = new AtomicInteger(); public static void main(String[] args) { for (int i = 0; i < 20; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + "-->>>>" + num.getAndIncrement()); }); } executorService.shutdown(); } 输出: pool-1-thread-2-->>>>0 pool-1-thread-1-->>>>1 pool-1-thread-2-->>>>2 pool-1-thread-1-->>>>3 pool-1-thread-2-->>>>4 pool-1-thread-1-->>>>5 pool-1-thread-2-->>>>6 pool-1-thread-1-->>>>7 pool-1-thread-2-->>>>8 pool-1-thread-1-->>>>9 pool-1-thread-2-->>>>10 pool-1-thread-1-->>>>11 pool-1-thread-2-->>>>12 pool-1-thread-1-->>>>13 pool-1-thread-2-->>>>14 pool-1-thread-1-->>>>15 pool-1-thread-2-->>>>16 pool-1-thread-4-->>>>17 pool-1-thread-3-->>>>18 pool-1-thread-5-->>>>19