前言
使用自创建的线程跟使用线程池有啥区别?提到线程池是不是脑海中闪现了创建线程池的那几个核心参数、工作流程、线程池的复用、拒绝机制、缓冲机制等,这些理论知识点想必也牢记许久了。虽然线程池支持在虚拟机进程接受到退出命令后可以进行shutdown。那么shutdown跟线程中断又有什么区别?在运行中的线程能否直接kill掉?我们能否监听关闭事件进行补偿?
为什么线程中断不了?
回答这个问题之前需要先思考,中断会有什么影响?如果直接kill掉,那么这个线程使用的所有资源能被正常释放吗?我们虽然interrupt()方法是中断线程的没错,但是它也仅仅是将线程的中断位设置为true,它不会停止线程,而是需要用户自己去监视线程的状态为并做对应的处理。可能你会说stop()方法,这个方法已经被废弃掉了,这种显示的停止的方法带来的问题会更多,你可以想象一下会有哪些影响?言归正传,如果线程没有对中断进行处理,仅仅是调用interrupt()是不会停止的,如下。
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (true){
if(Thread.currentThread().isInterrupted()){
System.out.println("Error");
}else {
System.out.println("Success");
}
}
});
thread.start();
//ps:此代码没有做响应中断处理。
thread.interrupt();
}
如何解决任务丢失?
前面说了这么多,现在看来,抛开直接KILL掉程序不说,首先我们要保证程序在正常的重启期间,任务是不能丢失的,你可能先想到是实现Hook方法,在程序关闭的时候触发收尾工作,来保证线程池的正常关闭。老师傅说可以交给Spring管理即可,思考了一下也是,我们只需要实现destroy方法,然后shutdown即可。既然想到了Spring,那Spring的线程池是否处理了shutdown方法,那我们来探究一下,日常使用Spring线程池的时候最常见的一行配置你肯定见过,如下。
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="10" />
<!-- 缓存队列 -->
<property name="queueCapacity" value="20" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
public void setCorePoolSize(int corePoolSize) {
//...
}
public int getCorePoolSize() {
//...
}
public void setMaxPoolSize(int maxPoolSize) {
//...
}
//...
}
InitializingBean跟DisposableBean这不经常见么,是的没错,凡是继承InitializingBean跟DisposableBean的Bean,会在bean的初始化跟销毁的时候调用对应的afterPropertiesSet()跟destory()方法。具体直接上一张图搞定。
ExecutorConfigurationSupport
初始化方法暂且抛开不说,因为我们这里探究的是销毁方法。直接上代码。
public void destroy() {
this.shutdown();
}
public void shutdown() {
if (this.logger.isInfoEnabled()) {
this.logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
//waitForTasksToCompleteOnShutdown为被调用时是否等待当前任务完成
if (this.waitForTasksToCompleteOnShutdown) {
//如果需要等待任务执行完,就调用shutdown方法
this.executor.shutdown();
} else {
//如果不需求,就强制关闭,并会返回那些未执行的任务
Iterator var1 = this.executor.shutdownNow().iterator();
while(var1.hasNext()) {
Runnable remainingTask = (Runnable)var1.next();
//取消所有剩下需要执行的线程
this.cancelRemainingTask(remainingTask);
}
}
//如果设置了等待时长,当前线程阻塞直到如下3种情况
//1:等所有已提交的任务(包括正在运行的和队列中等待的)执行完毕
//2:超过设置时长
//3:线程被中断,响应InterruptedException
this.awaitTerminationIfNecessary(this.executor);
}
}
由此上我们可以看到,使用Spring线程池可以最大程度上解决任务丢失的问题,当然我们可以通过实现DisposableBean接口来自定义销毁的操作。那么还有更优雅的处理方式吗?
Guava
我相信很多童鞋都用过Guava,Guava中包含了许多并发类,同时也包含了几个方便的线程池相关的ExecuorService的实现,但需要注意的是这些实现类都无法通过直接创建或者子类化来创建实例,但是我们可以通过MoreExecuors来创建它们,MoreExecuors中的方法如下
依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
getExitingExecutorService
//获得一个跟随JVM关闭而关闭的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
ExecutorService executorService = MoreExecutors.getExitingExecutorService(threadPoolExecutor);
源码分析
//关键源码
final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
//120S为关闭等待时长
return this.getExitingExecutorService(executor, 120L, TimeUnit.SECONDS);
}
final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
//改变ThreadFactory,为它设置守护线程,
MoreExecutors.useDaemonThreadFactory(executor);
//封装传入的线程池
ExecutorService service = Executors.unconfigurableExecutorService(executor);
//设置钩子线程,并设置等待时长
this.addDelayedShutdownHook(service, terminationTimeout, timeUnit);
return service;
}
final void addDelayedShutdownHook(final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
Preconditions.checkNotNull(service);
Preconditions.checkNotNull(timeUnit);
this.addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
public void run() {
try {
//关闭线程池
service.shutdown();
//这个需要等到提交的任务全部执行完,这个上文跟Spring的阻塞等待关闭是一个道理
service.awaitTermination(terminationTimeout, timeUnit);
} catch (InterruptedException var2) {
}
}
}));
}
addDelayedShutdownHook
//方法使用
MoreExecutors.addDelayedShutdownHook(threadPoolExecutor,120L,TimeUnit.SECONDS);
源码分析
//关键源码,此处调用的addDelayedShutdownHook跟上面的一样,只不过这里没有改变ThreadFactory,只设置了钩子线程
public static void addDelayedShutdownHook(ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
(new MoreExecutors.Application()).addDelayedShutdownHook(service, terminationTimeout, timeUnit);
}
shutdownAndAwaitTermination
//使用方法
MoreExecutors.shutdownAndAwaitTermination(threadPoolExecutor,10L,TimeUnit.SECONDS);
源码分析
//关键源码
public static boolean shutdownAndAwaitTermination(ExecutorService service, long timeout, TimeUnit unit) {
long halfTimeoutNanos = unit.toNanos(timeout) / 2L;
//预关闭
service.shutdown();
try {
//判断是否有提交的任务没有执行完。
if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
//如果没有执行完,继续尝试强行关闭
service.shutdownNow();
//然后进行阻塞等待关闭
service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException var7) {
//如果在关闭过程中如果发生中断,先设置中断恢复。
Thread.currentThread().interrupt();
//直接强行关闭
service.shutdownNow();
}
//返回状态
return service.isTerminated();
}
Example-Demo
//1
@Configuration
public class ThreadPoolConfiguration {
@Bean(name = "executorServiceExiting")
public ExecutorService executorService(){
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
return MoreExecutors.getExitingExecutorService(threadPoolExecutor);
}
@Bean(name = "executorServiceShutdownHook")
public ExecutorService executorServiceShutdownHook(){
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
MoreExecutors.addDelayedShutdownHook(threadPoolExecutor,120L,TimeUnit.SECONDS);
return threadPoolExecutor;
}
}
//2
@Configuration
public class ThreadPoolConfiguration {
@Bean
@Order(2)
public ExecutorService executorService(){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolRegisterCenter.register(threadPoolExecutor);
return threadPoolExecutor;
}
}
//-------------
/**
* @author Duansg
* @desc 线程池的注册中心
* @date 2020-04-01 00:23:12
*/
@Configuration
@Order(1)
public class ThreadPoolRegisterCenter implements ApplicationRunner, DisposableBean {
/**
* @desc 线程池容器
*/
private static final Set<ExecutorService> THREAD_POOL_CONTEXT = Sets.newConcurrentHashSet();
/**
* @desc 添加线程池到容器中
* @param executorService
*/
public static void register(ExecutorService executorService) {
THREAD_POOL_CONTEXT.add(executorService);
}
@Override
public void run(ApplicationArguments args) throws Exception {
//设置钩子方法,并使用shutdownAndAwaitTermination方法进行关闭处理
Runtime.getRuntime().addShutdownHook(new Thread(() -> THREAD_POOL_CONTEXT.forEach(executorService -> {
if (!ObjectUtils.isEmpty(executorService)){
shutdownAndAwaitTermination(executorService);
}
})));
}
@Override
public void destroy() throws Exception {
//防止调用tomcat reload方法,导致的无法感知到关闭事件
THREAD_POOL_CONTEXT.forEach(executorService -> {
if (shutdownAndAwaitTermination(executorService)){
THREAD_POOL_CONTEXT.remove(executorService);
}
});
}
/**
* @desc 关闭方法
* @param executorService
*/
private boolean shutdownAndAwaitTermination(ExecutorService executorService) {
return MoreExecutors.shutdownAndAwaitTermination(executorService, 10L, TimeUnit.SECONDS);
}
}