配置线程池参数
定时任务线程池基础参数
# 定时任务线程池基础参数 task: pool: corePoolSize: 5 # 核心线程数 maxPoolSize: 20 # 设置最大线程数 keepAliveSeconds: 300 # 设置线程活跃时间,单位秒 queueCapacity: 100 # 设置队列容量
定义参数实体bean
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author snow * @Date 2024/4/8 * @Description */ @ConfigurationProperties(prefix = "task.pool") @Data public class TaskThreadPoolInfo { // 核心线程池大小,即保持活动状态的最小线程数 private Integer corePoolSize; // 最大线程池大小,即线程池允许创建的最大线程数 private Integer maxPoolSize; // 非核心线程空闲时的存活时间,超过此时间,空闲线程将被终止 private Integer keepAliveSeconds; // 阻塞队列容量,用于存放等待执行的任务 private Integer queueCapacity; }
配置线程池
在配置类中配置:
import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author snow * @Date 2024/4/8 * @Description */ @Configuration @EnableConfigurationProperties(TaskThreadPoolInfo.class) @Slf4j public class TaskExecutePool { private TaskThreadPoolInfo info; public TaskExecutePool(TaskThreadPoolInfo info) { this.info = info; } /** * 定义任务执行器 * @return */ @Bean(name = "threadPoolTaskExecutor",destroyMethod = "shutdown") public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ //构建线程池对象 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心线程数:核心线程数(获取硬件):线程池创建时候初始化的线程数 taskExecutor.setCorePoolSize(info.getCorePoolSize()); //最大线程数:只有在缓冲队列满了之后才会申请超过核心线程数的线程 taskExecutor.setMaxPoolSize(info.getMaxPoolSize()); //缓冲队列:用来缓冲执行任务的队列 taskExecutor.setQueueCapacity(info.getQueueCapacity()); //允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁 taskExecutor.setKeepAliveSeconds(info.getKeepAliveSeconds()); //线程名称前缀 taskExecutor.setThreadNamePrefix("XXXX-"); //设置拒绝策略 // taskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler()); //参数初始化 taskExecutor.initialize(); return taskExecutor; } /** * 自定义线程拒绝策略 * @return */ /** @Bean public RejectedExecutionHandler rejectedExecutionHandler(){ RejectedExecutionHandler errorHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { //TODO 可自定义Runable实现类,传入参数,做到不同任务,不同处理 log.info("股票任务出现异常:发送邮件"); } }; return errorHandler; } */ }
使用
注入线程池bean:
/** * 注入线程池对象 */ @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; //要求:将集合分组,每组的集合长度为20 Lists.partition(ids, 20).forEach(ids->{ //每个分片的数据开启一个线程异步执行任务 threadPoolTaskExecutor.execute(()->{ //解析数据 List<PO> list = XXX;// XXmapper.insertBatch(list); }); });