yml参数配置
# 定时任务线程池基础参数 task: pool: corePoolSize: 5 # 核心线程数 maxPoolSize: 20 # 设置最大线程数 keepAliveSeconds: 300 # 设置线程活跃时间 queueCapacity: 100 # 设置队列容量
定义参数实体bean
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Description */ @ConfigurationProperties(prefix = "task.pool") @Data public class TaskThreadPoolConfig { /** * 核心线程数(获取硬件):线程池创建时候初始化的线程数 */ private Integer corePoolSize; private Integer maxPoolSize; private Integer keepAliveSeconds; private Integer queueCapacity; }
配置线程池
import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author by itheima * @Date 2021/12/13 * @Description */ @Configuration @EnableConfigurationProperties(TaskThreadPoolInfo.class) @Slf4j public class TaskExecutePoolConfig { private TaskExecutePoolConfig pollConfig; public TaskExecutePoolConfig(TaskExecutePoolConfig pollConfig) { this.pollConfig= pollConfig; } /** * 定义任务执行器 * @return */ @Bean(name = "threadPoolTaskExecutor",destroyMethod = "shutdown") public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ //构建线程池对象 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心线程数:核心线程数(获取硬件):线程池创建时候初始化的线程数 taskExecutor.setCorePoolSize(pollConfig.getCorePoolSize()); //最大线程数:只有在缓冲队列满了之后才会申请超过核心线程数的线程 taskExecutor.setMaxPoolSize(pollConfig.getMaxPoolSize()); //缓冲队列:用来缓冲执行任务的队列 taskExecutor.setQueueCapacity(pollConfig.getQueueCapacity()); //允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁 taskExecutor.setKeepAliveSeconds(pollConfig.getKeepAliveSeconds()); //线程名称前缀 taskExecutor.setThreadNamePrefix("StockThread-"); //设置拒绝策略 taskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler()); //参数初始化 taskExecutor.initialize(); return taskExecutor; } /** * 自定义线程拒绝策略 * @return */ @Bean public RejectedExecutionHandler rejectedExecutionHandler(){ RejectedExecutionHandler errorHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { //TODO 可自定义Runable实现类,传入参数,做到不同任务,不同处理 log.info("股票任务出现异常:发送邮件"); } }; return errorHandler; } }
实战
在需要用到的地方注入
/** * 注入线程池对象 */ @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor;
使用
//要求:将集合分组,每组的集合长度为20 Lists.partition(stockIds,20).forEach(ids->{ //每个分片的数据开启一个线程异步执行任务 threadPoolTaskExecutor.execute(()->{ //拼接获取A股信息的url地址 String stockRtUrl=stockInfoConfig.getMarketUrl()+String.join(",",ids); //发送请求获取数据 //String result = restTemplate.getForObject(stockRtUrl, String.class); String result=restTemplate.postForObject(stockRtUrl,entity,String.class); //解析获取股票数据 List<StockRtInfo> list = parserStockInfoUtil.parser4StockOrMarketInfo(result, 3); //分批次批量插入 log.info("当前股票数据:{}",list); stockRtInfoMapper.insertBatch(list); }); });
线程池高级理论
线程池工作流程概述
说明:
1 当一个任务通过submit或者execute方法提交到线程池的时候,如果当前池中线程数(包括闲置线程)小于coolPoolSize,则创建一个新的线程执行该任务; 2 如果当前线程池中线程数已经达到coolPoolSize,则将任务放入等待队列; 3 如果任务队列已满,则任务无法入队列,此时如果当前线程池中线程数小于maxPoolSize,则创建一个临时线程(非核心线程)执行该任务。 4 如果当前池中线程数已经等于maxPoolSize,此时无法执行该任务,根据拒绝执行策略处理。 注意: 当池中线程数大于coolPoolSize,超过keepAliveTime时间的闲置线程会被回收掉。 回收的是非核心线程,核心线程一般是不会回收的。 如果设置allowCoreThreadTimeOut(true),则核心线程在闲置keepAliveTime时间后也会被回收。
线程池拒绝策略
【1】什么时候会触发线程池的拒绝策略?
1.当我们调用 shutdown 等方法关闭线程池后,如果再向线程池内提交任务,就会遭到拒绝;
2.线程池没有空闲线程(线程达到最大线程数且都在执行任务)并且队列已经满;★★★
【2】拒绝策略类型有哪些?
线程池为我们提供了4种拒绝策略:
AbortPolicy
这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略(默认)。
DiscardPolicy
当有新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。(不负责任)
DiscardOldestPolicy
丢弃任务队列中的头结点,通常是存活时间最长的任务,它也存在一定的数据丢失风险。
CallerRunsPolicy (推荐)
第四种拒绝策略是 ,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。
任务线程满了后,改策略可将执行的人为交换给主线程执行,这个过程相当于一个正反馈,此时如果主线程能处理,则处理,如果也不能处理,也就以为这当前服务不能接收新的任务了;
主线程处理任务期间,可以为线程池腾出时间,如果此时有新的空闲线程,那么继续协助主线程处理任务;
自定义拒绝策略
实现RejectedExecutionHandler接口来实现自己的拒绝策略;★★★
线程池参数设置原则
1)如何为线程池设置合适的线程参数?
目前根据一些开源框架,设置多少个线程数量通常是根据应用的类型**:IO 密集型、CPU 密集型。**
- I/O密集型
1.I/O密集型的场景在开发中比较常见,比如像 MySQL数据库读写、文件的读写、网络通信等任务,这类任务不会 特别消耗CPU资源,但是IO操作比较耗时,会占用比较多时间; 2.IO密集型通常设置为 2n+1,其中 n 为 CPU 核数;
CPU密集型
1.CPU密集型的场景,比如像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,这些场景大部分都是纯 CPU计算; 2.CPU密集型通常设置为n+1,这样也可避免多线程环境下CPU资源挣钱带来上下文频繁切换的开销;
2) 如何获取当前服务器的cpu核数?
int cors= Runtime.getRuntime().availableProcessors();
3) 无界队列问题
实际运行中,我们一般会设置线程池的阻塞队列长度,如果不设置,则采用默认值:
private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private int queueCapacity = Integer.MAX_VALUE;
在这个过程中,如果设置或者使用不当,容易造成内存溢出问题;
所以企业开发中,禁止使用默认的队列长度;