概述
Spring3开始提供了@Async注解,我们只需要在方法上标注此注解,此方法即可实现异步调用。 除此之外, 还得需要一个配置类,通过@EnableAsync 来开启异步功能 。
V1.0 默认的实现
Step1 搞配置类,开启@EnableAsync
我们需要使用@EnableAsync来开启异步任务支持。
@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
我们这里选择单独搞个配置类
@Configuration @EnableAsync public class ThreadPoolTaskConfig { }
Step2 搞方法标记 @Async注解
package com.artisan.jobs; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/3/1 0:42 * @mark: show me the code , change the world */ @Component @Slf4j public class AsyncJob { @Async public void job1() throws InterruptedException { long beginTime = System.currentTimeMillis(); Thread.sleep(2000); long endTime = System.currentTimeMillis(); log.info("job1 cost {} ms", endTime - beginTime); } @Async public void job2() throws InterruptedException { long beginTime = System.currentTimeMillis(); Thread.sleep(2000); long endTime = System.currentTimeMillis(); log.info("job2 cost {} ms", endTime - beginTime); } }
Step3 搞调用
package com.artisan.controller; import com.artisan.jobs.AsyncJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/3/1 0:44 * @mark: show me the code , change the world */ @RestController @RequestMapping("/async") @Slf4j public class AsyncController { @Autowired private AsyncJob asyncJob; @RequestMapping("/job") public String task() throws InterruptedException { long beginTime = System.currentTimeMillis(); // 执行异步任务 asyncJob.job1(); asyncJob.job2(); // 模拟业务耗时 Thread.sleep(1000); long cost = System.currentTimeMillis() - beginTime; log.info("main cost {} ms", cost); return "Task Cost " + cost + " ms"; } }
@Async注解在默认情况下用的是SimpleAsyncTaskExecutor
线,不是真正意义上的线程池。
所以,线程名称是 task-1 , task-2, task-3 , task-4…
2022-03-02 22:33:47.007 [http-nio-8080-exec-6] INFO com.artisan.controller.AsyncController:39 - main cost 1001 ms 2022-03-02 22:33:47.675 [http-nio-8080-exec-2] INFO com.artisan.controller.AsyncController:39 - main cost 1001 ms 2022-03-02 22:33:48.021 [task-4] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2014 ms 2022-03-02 22:33:48.021 [task-3] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2014 ms 2022-03-02 22:33:48.396 [http-nio-8080-exec-5] INFO com.artisan.controller.AsyncController:39 - main cost 1015 ms 2022-03-02 22:33:48.678 [task-6] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2004 ms 2022-03-02 22:33:48.678 [task-5] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2004 ms 2022-03-02 22:33:49.004 [http-nio-8080-exec-3] INFO com.artisan.controller.AsyncController:39 - main cost 1008 ms 2022-03-02 22:33:49.393 [task-8] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2011 ms 2022-03-02 22:33:49.393 [task-7] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2011 ms 2022-03-02 22:33:50.012 [task-9] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2015 ms 2022-03-02 22:33:50.012 [task-10] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2015 ms
可以看到,每次调用都会new一个线程。若系统中不断的创建线程…
Spring提供的线程池
名称 | 说明 |
SimpleAsyncTaskExecutor | 这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地 |
ConcurrentTaskExecutor | Executor的适配类,不推荐使用。如ThreadPoolTaskExecutor 不满足要求时,才用考虑使用这个类 |
ThreadPoolTaskScheduler | 可以使用cron表达式 |
ThreadPoolTaskExecutor | 推荐。 是对java.util.concurrent.ThreadPoolExecutor 的包装 |
V2.0 实现@Async的自定义线程池
package com.artisan.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。 * 我们这里选择使用单独的配置类AsyncConfiguration。 * @date 2022/3/1 0:41 * @mark: show me the code , change the world */ @Configuration @EnableAsync public class ThreadPoolTaskConfig { /** * 核心线程数(默认线程数) */ private static final int CORE_POOL_SIZE = 5; /** * 最大线程数 */ private static final int MAX_POOL_SIZE = 10; /** * 允许线程空闲时间(单位:默认为秒) */ private static final int KEEP_ALIVE_TIME = 10; /** * 缓冲队列大小 */ private static final int QUEUE_CAPACITY = 200; /** * 线程池名前缀 */ private static final String THREAD_NAME_PREFIX = "Async-Service-"; /** * 自定义线程池 * * @return */ @Bean("customAsyncPoolTaskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setQueueCapacity(KEEP_ALIVE_TIME); executor.setKeepAliveSeconds(QUEUE_CAPACITY); executor.setThreadNamePrefix(THREAD_NAME_PREFIX); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
其他保持不变, 重启测试
V3.0 多个线程池处理
需求: 不同的业务,使用不同的线程池
多个线程池
package com.artisan.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。 * 我们这里选择使用单独的配置类ThreadPoolTaskConfig * @date 2022/3/1 0:41 * @mark: show me the code , change the world */ @Configuration @EnableAsync public class ThreadPoolTaskConfig { /** * 核心线程数(默认线程数) */ private static final int CORE_POOL_SIZE = 5; /** * 最大线程数 */ private static final int MAX_POOL_SIZE = 10; /** * 允许线程空闲时间(单位:默认为秒) */ private static final int KEEP_ALIVE_TIME = 10; /** * 缓冲队列大小 */ private static final int QUEUE_CAPACITY = 200; /** * 线程池名前缀 */ private static final String THREAD_NAME_PREFIX = "Biz1_Async-Service-"; /** * 线程池名前缀 */ private static final String THREAD_NAME_PREFIX_2= "Biz2_Async-Service-"; /** * 自定义线程池 * * @return */ @Bean("tp1") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setQueueCapacity(KEEP_ALIVE_TIME); executor.setKeepAliveSeconds(QUEUE_CAPACITY); executor.setThreadNamePrefix(THREAD_NAME_PREFIX); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * 自定义线程池 * * @return */ @Bean("tp2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setQueueCapacity(KEEP_ALIVE_TIME); executor.setKeepAliveSeconds(QUEUE_CAPACITY); executor.setThreadNamePrefix(THREAD_NAME_PREFIX_2); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
配置 多个线程池, 然后 为@Async指定线程池名字即可实现 多个线程池处理
package com.artisan.jobs; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2022/3/1 0:42 * @mark: show me the code , change the world */ @Component @Slf4j public class AsyncJob { @Async("tp1") public void job1() throws InterruptedException { long beginTime = System.currentTimeMillis(); Thread.sleep(2000); long endTime = System.currentTimeMillis(); log.info("job1 cost {} ms", endTime - beginTime); } @Async("tp2") public void job2() throws InterruptedException { long beginTime = System.currentTimeMillis(); Thread.sleep(2000); long endTime = System.currentTimeMillis(); log.info("job2 cost {} ms", endTime - beginTime); } @Async() public void job3() throws InterruptedException { long beginTime = System.currentTimeMillis(); Thread.sleep(2000); long endTime = System.currentTimeMillis(); log.info("job3 cost {} ms", endTime - beginTime); } }
默认线程池
@Async()没标注,用哪个?????? 当系统存在多个线程池时,我们也可以配置一个默认线程池 ,配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池
package com.artisan.multi; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world * <p> * 实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池 */ @Configuration @EnableAsync @Slf4j public class DefaultAsyncConfiguration implements AsyncConfigurer { /** * 核心线程数(默认线程数) */ private static final int CORE_POOL_SIZE = 2; /** * 最大线程数 */ private static final int MAX_POOL_SIZE = 10; /** * 允许线程空闲时间(单位:默认为秒) */ private static final int KEEP_ALIVE_TIME = 10; /** * 缓冲队列大小 */ private static final int QUEUE_CAPACITY = 200; /** * 线程池名前缀 */ private static final String THREAD_NAME_PREFIX = "Default_Async-Service-"; @Bean(name = "defaultPool") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(CORE_POOL_SIZE); taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); taskExecutor.setQueueCapacity(KEEP_ALIVE_TIME); taskExecutor.setKeepAliveSeconds(QUEUE_CAPACITY); taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX); /** * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 * 通常有以下四种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 指定默认线程池 */ @Override public Executor getAsyncExecutor() { return executor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("线程池执行任务发横未知错误,执行方法:{}", method.getName(), ex); } }
验证一把
@RestController @RequestMapping("/async") @Slf4j public class AsyncController { @Autowired private AsyncJob asyncJob; @RequestMapping("/job") public String task() throws InterruptedException { long beginTime = System.currentTimeMillis(); // 执行异步任务 asyncJob.job1(); asyncJob.job2(); asyncJob.job3(); // 模拟业务耗时 Thread.sleep(1000); long cost = System.currentTimeMillis() - beginTime; log.info("main cost {} ms", cost); return "Task Cost " + cost + " ms"; }
完美匹配…
源码
https://github.com/yangshangwei/boot2