1、基于 ExecutorService 自定义线程池
使用java内部包实现线程池
private Logger logger = LoggerFactory.getLogger(InitBeanConfig.class); @Bean public ExecutorService callbackThreadPool() { //通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称 ThreadFactory factory = new ThreadFactoryBuilder() .setUncaughtExceptionHandler((t, e) -> logger.error(t.getName() + " excute error:", e)) .setNameFormat("callback-pool-%d").build(); int corePoolSize = 3; int maxPoolSize = 4; long keepAliveTime = 5; ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(100000), factory, new ThreadPoolExecutor.CallerRunsPolicy()); return pool; }
测试使用
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; @RestController @RequestMapping("thread") public class ThreadPoolController { @Resource private ExecutorService callbackThreadPool; @GetMapping public Object thread(){ callbackThreadPool.execute(() -> { // 业务代码块 }); return 1; } }
ThreadPoolTaskExecutor (Spring提供,以及监听线程池)
@RestController public class PageController { @Autowired ThreadPoolTaskExecutor applicationTaskExecutor; // applicationTaskExecutor 为spring注册时定义得 beanName // 开辟两个线程,后等待两个线程 都执行完的案例 @GetMapping("/thread") public Object thread() throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> { for(int i = 0 ; i < 100000 ; i++){ System.out.println("a-"+i); } }, applicationTaskExecutor); CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> { for(int i = 0 ; i < 100000 ; i++){ System.out.println("w-"+i); } }, applicationTaskExecutor); // 等待这两个线程都执行完 CompletableFuture.allOf(completableFuture1, completableFuture2).get(); return "success"; } }
3、自定义 ThreadPoolTaskExecutor 线程池
自定义设置线程的最大线程数等参数。
自定义Bean
@Bean public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //此方法返回可用处理器的虚拟机的最大数量; 不小于1 //int core = Runtime.getRuntime().availableProcessors(); int core = 1; //设置核心线程数 executor.setCorePoolSize(core); //设置最大线程数 executor.setMaxPoolSize(core * 2 + 1); //除核心线程外的线程存活时间 executor.setKeepAliveSeconds(3); //如果传入值大于0,底层队列使用的是LinkedBlockingQueue,否则默认使用SynchronousQueue executor.setQueueCapacity(40); //线程名称前缀 executor.setThreadNamePrefix("thread-execute"); //设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
@Autowired ThreadPoolTaskExecutor myThreadPoolTaskExecutor; // myThreadPoolTaskExecutor 为beanName @GetMapping("/thread") public Object thread() throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> { for(int i = 0 ; i < 100000 ; i++){ System.out.println("a-"+i); } }, myThreadPoolTaskExecutor); CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> { for(int i = 0 ; i < 100000 ; i++){ System.out.println("w-"+i); } }, myThreadPoolTaskExecutor); // 等待两个线程执行完 CompletableFuture.allOf(completableFuture1, completableFuture2).get(); return "success"; }