SpringBoot ThreadPoolTaskExecutor @Async
在SpringBoot
项目中,异步线程池的使用,参数设置,队列拒绝策略;以及对比ForkJoinPool
各场景下的性能。
环境:jdk8、springboot 2.1.6
线程池注入(一)
多线程池注入,用于多个业务场景,避免各业务之间相互影响
package com.mpos.mnp.web.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @className: BeanConfig * @author: www.wityx.com */ @Configuration @EnableAsync public class BeanConfig { @Bean("taskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(50); // 设置最大线程数 executor.setMaxPoolSize(200); // 设置队列容量 executor.setQueueCapacity(20000); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("mnp-send"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); return executor; } @Bean("jobExecutor") public TaskExecutor jobExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(50); // 设置最大线程数 executor.setMaxPoolSize(200); // 设置队列容量 executor.setQueueCapacity(500); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置默认线程名称 executor.setThreadNamePrefix("mnp-job"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); return executor; } }
参数解释
corePoolSize
:核心线程数
- 核心线程会一直存活,及时没有任务需要执行
- 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
- 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
queueCapacity
:任务队列容量(阻塞队列)
- 当核心线程数达到最大时,新任务会放在队列中排队等待执行 ,队列存放的数据大小跟分配的内存有关
maxPoolSize
:最大线程数
- 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
- 当线程数=maxPoolSize,且任务队列已满时,线程池会跟根据拒绝策略进行相应的处理
keepAliveTime
:线程空闲时间(s)
- 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
- 如果allowCoreThreadTimeout=true,则会直到线程数量=0
threadNamePrefix
:设置默认线程名称
- 线程前缀名称,有助于区分不同线程池之间的线程
rejectedExecutionHandler
:任务拒绝处理器
- 两种情况会拒绝处理任务:
- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
- 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
- ThreadPoolExecutor类有几个内部实现类来处理这类情况:
- AbortPolicy 丢弃任务,抛运行时异常
- CallerRunsPolicy 执行任务 P:当线程池满以后,队列达到最大值时,异步先会变为同步执行,影响主线程性能,请结合业务场景,具体分析使用(本人亲测)
- DiscardPolicy 忽视,什么都不会发生
- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
以下参数请慎用, 线程一直再运行时,无法关闭程序,需kill
进程。并丢失队列中的数据。
// 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true);
异步使用(二)
异步实现类
package com.mpos.mnp.app.impl; import com.mpos.mnp.dao.entity.AgencyNoticeMessage; import com.mpos.mnp.dao.entity.AppNoticeMessage; import com.mpos.mnp.service.job.MessageJobService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author: http://www.wityx.com * @link:http://www.wityx.com/post/822_1_1.html */ @Component @Slf4j public class AsyncSendMessage { /** * 业务类Service */ @Resource private MessageJobService messageJobService; /** * 异步发送 * * @param id */ @Async("taskExecutor") public void sendAgencyMessage(String id) { log.info("异步通知"); // 参数传递用id,使用更小的字节放入缓存队列 messageJobService.sendAgencyMessage(null,id); } /** * 异步发送 * * @param id */ @Async("jobExecutor") public void sendAppMessage(String id) { log.info("异步推送消息"); // 参数传递用id,使用更小的字节放入缓存队列 messageJobService.sendAppPushMessage(null,id); } }
特别注意@Async
的使用,不可在同一类使用@Async
,否则不生效。
其他类使用使用异步处理
// 建议使用此注解,避免使用@Autowired,具体原因参考官方说明 @Resource private AsyncSendMessage asyncSendMessage; /** * 数据保存 * @author: www.wityx.com * @param param */ private void saveAppMessageData(AppMessageParam param) { String uniqNo = "MNP" + param.getReqSource() + param.getReqNo(); try { //防止重复提交判断 Redis锁 默认60秒有效期 boolean lockFlag = redisUtil.lock(uniqNo, uniqNo, 60); if (!lockFlag) { log.warn("【移动推送】请求单号:{}重复提交!", param.getReqNo()); throw new BusinessException(ResultCode.COMMON_DULIICATE_SUBMIT.getCode(), "请求号:" + param.getReqNo()); } // 异步处理 asyncSendMessage.sendAppMessage(message.getId()); } catch (BusinessException e) { throw e; } catch (Throwable e) { throw new BusinessException(ResultCode.SAVE_DB_ERROR); } finally { // 解锁 redisUtil.unlock(uniqNo, uniqNo); } }
在使用异步时,捕获异常建议使用Throwable
,Exception
的父类。
springboot启动类打开异步
package com.mpos.mnp.web; import com.alibaba.dubbo.config.spring.context.annotation.EnableDubboConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * Description: web 启动类 * @author: www.wityx.com */ @SpringBootApplication @EnableSwagger2 @ComponentScan(basePackages = {"com.wityx.www","com.gpay.user"}) @EnableMongoRepositories(basePackages = "com.wityx.www.dao.repository") @EnableDubboConfig @EnableScheduling @EnableAsync public class MnpWeb { public static void main(String[] args){ SpringApplication.run(MnpWeb.class, args); } }