编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
目录
9.1.5.3. Executors-newFixedThreadPool
9.1.5.4. Executors-newCachedThreadPool
9.1.5.5. Executors-newSingleThreadExecutor
9.1.5.6. Executors-newSingleThreadExecutor
9.1.5. ThreadPoolExecutor
9.1.5.1. 状态和数量
编辑
ThreadPoolExecutor
状态和数量:
状态名 |
高3位 |
接受新任务 |
处理阻塞队列任务 |
说明 |
RUNNING |
111 |
Y |
Y |
接受新任务,并会处理阻塞队列中的任务 |
SHUTDOWN |
000 |
N |
Y |
不会接受新任务,但会处理阻塞队列中剩余的任务 |
STOP |
001 |
N |
N |
中断正在执行的任务,抛弃阻塞队列中的任务 |
TERMINATED |
010 |
- |
- |
任务全部执行完毕,活动线程数为0,即将进入终结 |
TERMINATED |
011 |
- |
- |
线程池终结 |
采用int高3位表示线程池状态,低29位表示线程数量,存储在一个原子变量ctl中,目的是将线程状态与线程个数合二为一,这样就可以用一次CAS对其赋值
|
||||
从数字上,TERMINATED>TERMINATED>STOP>SHUTDOWN>RUNNING,高三位的1表示负数 |
9.1.5.2. 构造方法
public ThreadPoolExecutor( int corePoolSize,//核心线程数(最多保留的线程数) int maximumPoolSize,//最大线程数 long keepAliveTime,//生存时间,针对救急线程 TimeUnit unit,//时间单位,针对救急线程 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory,//线程工厂,创建线程时起名字 RejectedExecutionHandler handler)//拒绝策略
ThreadPoolExecutor
的工作流程:
ThreadPoolExecutor
包含两类线程:核心线程和救急线程,采用懒加载的创建方式,存在救急线程的前提是选择有界队列corePoolSize
指核心线程数,maximumPoolSize
指核心线程数+救急线程数- 当核心线程都在执行任务且阻塞队列已满但是还有任务继续入队时,
ThreadPoolExecutor
会先检查线程池中是否可以有救急线程 - 有,救急线程执行多出来的任务,执行完任务等待
keepAliveTime
后,要是没有任务继续入队,救急线程就会被销毁,下次高峰期才会再次创建救急线程 - 没有,说明任务数超过了
maximumPoolSize
,采用拒绝策略 - JDK提供了4中拒绝策略
编辑
AbortPolicy
:抛出RejectedExecutionException
异常,默认策略CallerRunsPolicy
:让调用者运行任务DiscardPolicy
:放弃本次任务DiscardOldestPolicy
:放弃队列中最早的任务,本任务取而代之
- 第三方框架中也有一些拒绝策略的扩展
Dubbo
在AbortPolicy
基础上增加日志功能,并调用jstack
抓取当前栈中的信息,方便定位问题Netty
创建新的线程来执行任务,这样实现并不好,因为就没有了限制ActiveMQ
超时等待60sPinPoint
使用了一个拒绝策略链,尝试策略链中每一个拒绝策略
9.1.5.3. Executors-newFixedThreadPool
//创建一个固定大小的线程池:适用于任务量已知,相对耗时的任务 public static ExecutorService newFixedThreadPool(int nThreads) {//传递的线程数 return new ThreadPoolExecutor( //核心线程数:nThreads,最大线程数:nThreads nThreads, nThreads,//没有救急线程 0L, TimeUnit.MILLISECONDS,//存活时间:0毫秒 //阻塞队列:LinkedBlockingQueue无界队列 new LinkedBlockingQueue<Runnable>()); }
9.1.5.4. Executors-newCachedThreadPool
//创建一个缓冲线程池:适用于任务量不断增长,但每个任务执行时间较短的情况 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( //核心线程数:0最大线程数:2,147,483,647 0, Integer.MAX_VALUE,//没有核心线程,全都是救急线程,且可以无限创建,存活时间为60s 60L, TimeUnit.SECONDS,//存活时间:60秒 //阻塞队列:SynchronousQueue同步队列,没有容量,一手交钱一手交货 new SynchronousQueue<Runnable>(), ); }
9.1.5.5. Executors-newSingleThreadExecutor
//创建一个单线程线程池:适用于任务是串行执行,多出来的任务排队 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor( //核心线程数1,最大线程数1 1, 1, 0L, TimeUnit.MILLISECONDS,//存活时间0毫秒 //阻塞队列:LinkedBlockingQueue无界队列 new LinkedBlockingQueue<Runnable>())); }
9.1.5.6. Executors-newSingleThreadExecutor
//创建一个带有任务调用的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //延时执行任务 public <V> ScheduledFuture<V> schedule( //具体执行的任务对象 Runnable command, //延时时间 long delay, TimeUnit unit); //定时执行任务 public ScheduledFuture<?> scheduleAtFixedRate( //具体的执行任务对象 Runnable command, //初始延时时间 long initialDelay, //任务之间的执行延迟时间:从上一次任务开始执行时,延迟时间就开始 long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay( //具体的执行任务对象 Runnable command, //初始延时时间 long initialDelay, //任务之间的执行延迟时间:从上一次任务执行结束时,延迟时间才开始 long delay, TimeUnit unit)
9.1.5.7. 创建线程池方法对比
newSingleThreadExecutor()
与自己创建一个单线程串行执行任务的区别:
- 自己创建一个单线程串行执行任务如果遇到异常情况,没有任务补救措施,整个程序停止
newSingleThreadExecutor()
遇到异常情况还会创建一个新的线程,保持始终有一个线程工作
newSingleThreadExecutor()
和newFixedThreadPool(1)
的区别
newSingleThreadExecutor()
线程数始终为1,不能修改,FinalizableDelegatedExecutorService
应用的是装饰器模式,对外只暴露了ExecutorService
接口,不能调用ThreadPoolExecutor
中特有的方法newFixedThreadPool(1)
初始线程数为1,之后还可以通过对外暴露的ThreadPoolExecutor
对象来调用其setCorePoolSize()
来修改线程数
9.1.5.8. 提交任务方法
//执行任务 void execute(Runnable command); //提交任务task,用返回值Future获取任务执行的结果 <T> Future<T> submit(Callable<T> task); //提交tasks中所有任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException; //超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException; //提交tasks中所有任务,哪个任务先执行完毕,返回此任务的返回结果,其他任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException; //超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException;
@Slf4j public class SubmitTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(3); invokeAny(pool); } private static void invokeAny(ExecutorService pool) throws InterruptedException, ExecutionException { Object result = pool.invokeAny(Arrays.asList( () -> { log.debug("running...111"); Thread.sleep(new Random().nextInt(10000)); log.debug("end...111"); return "1"; }, () -> { log.debug("running...222"); Thread.sleep(new Random().nextInt(10000)); log.debug("end...222"); return "2"; }, () -> { log.debug("running...333"); Thread.sleep(new Random().nextInt(10000)); log.debug("end...333"); return "3"; } )); log.debug("执行结果:{}",result); } private static void invokeAll(ExecutorService pool) throws InterruptedException { List<Future<Object>> futures = pool.invokeAll(Arrays.asList( () -> { log.debug("running...111"); Thread.sleep(new Random().nextInt(10000)); return "1"; }, () -> { log.debug("running...222"); Thread.sleep(new Random().nextInt(10000)); return "2"; }, () -> { log.debug("running...333"); Thread.sleep(new Random().nextInt(10000)); return "3"; } )); futures.forEach(future -> { try { log.debug("执行结果:{}", future.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); } private static void submit(ExecutorService pool) throws InterruptedException, ExecutionException { Future<String> future = pool.submit(() -> { log.debug("正在执行"); Thread.sleep(1000); return "任务结束"; }); log.debug("执行结果:{}", future.get()); } }
9.1.5.9. 关闭线程池
/* 1.将线程池状态变为SHUTDOWN 2.不会接受新任务,把剩余任务完成 3.不会阻塞调用线程 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //修改线程池状态 advanceRunState(SHUTDOWN); //打断空闲的线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //尝试终止线程池,检查线程池状态是否为TERMINATED tryTerminate(); }
/* 1.将线程池状态变为STOP 2.不会接受新任务,剩余任务抛弃并返回 3.用interrupt打断正在执行任务的线程 */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //修改线程池状态 advanceRunState(STOP); //打断所有线程 interruptWorkers(); //将剩余任务返回 tasks = drainQueue(); } finally { mainLock.unlock(); } //尝试终止线程池,检查线程池状态是否为TERMINATED tryTerminate(); return tasks; }
//检查线程池状态是否处于RUNNING,是返回false,否返true public boolean isShutdown(); //检查线程池状态是否处于TERMINATED,是返回true,否返false public boolean isTerminated(); //调用shutdown()方法后,线程池不会等待所有线程任务执行结束 //如果想在线程池TERMINATED后做些事情,可以用此方法等待 public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException