[toc]
简介
往往当我们谈论多线程的时候,总会提起一个词“异步”,其实多线程不应该等于异步,异步意味着解耦,虽然多线程本身解耦,但是能实现异步的方式实在太多太多了,如发布/订阅,mq......,所以,一般来说我更倾向于将多线程划归实现异步的一种方式。
我们以B/S架构下的系统开发作为背景,在实际开发过程中我们在什么场景需要用到多线程呢?
- 一件事情需要很长时间才可以做完,让S端去做把,B端不等它
- 要完成B端的请求,需要做好多件事情 ,让S端去做把,B端不等它
- 要完成B端的请求,需要其它好多服务配合处理才能完成,这时可以让S端使用多线程同时调度其它服务过来干活,最终等干活最慢的那个服务返回结果之后,S端汇总所有结果返回给B端
- ......
SpringBoot 框架从2013年诞生,至今10年过去了,已是一个非常成熟的框架,基于开源的特性,以及社区的努力,现在已经是 java领域最为火热,应用最广的web开发框架了。
SpringBoot框架在多线程方面的设计非常的全面、友好,开发人员极易理解,在项目中实践也非常容易得心应手。
本文总结了在使用SpringBoot开发项目过程中常见的多线程编码方式,以简单的示例作为嵌入点,展示在项目开发中应用多线程都有哪些姿势,旨在总结、分享、交流编程技巧。
先聊聊Thread类
提到线程,无法避免的会想到Thread这个类,这个类诞生于java 1.0版本,Thread类实现了Runnable接口。借助Thread类我们可以轻易的创建一个线程。
- 方法一:创建一个类,继承Thread,并重写run()方法,而后调用Thread.Start()方法启动新线程
- 方法二:创建一个类,实现Runnable接口,并重写run()方法,将此类实例化后的对象传递到Thread的构造函数中,而后调用Thread.Start()方法启动新线程
示例代码:
新建一个类 DemoThread ,继承Thread
public class DemoThread extends Thread {
@Override
public void run() {
//do something
try {
System.out.println("ThreadDemo1 - 当前线程为:" + Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("ThreadDemo1 - 异步执行完毕");
}
}
}
新建一个类 DemoRunnable ,实现 Runnable
public class DemoRunnable implements Runnable {
@Override
public void run() {
//do something
try {
System.out.println("DemoRunnable - 当前线程为:" + Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(" DemoRunnable- 异步执行完毕");
}
}
}
新建一个Controller ,尝试通过Thread开启新线程
- 先打印当前线程的名字
- 实例化 DemoThread ,并直接调用start方法启动新线程
- 实例化 DemoRunnable,并放进Thread构造函数,并调用start方法启动新线程
System.out.println("当前线程为:" + Thread.currentThread().getName());
DemoThread demoThread = new DemoThread();
demoThread.start();
DemoRunnable runnable = new DemoRunnable();
new Thread(runnable).start();
通过输出我们可以观察到确实开辟了新的线程
当前线程为:http-nio-9009-exec-3
ThreadDemo1 - 当前线程为:Thread-3
DemoRunnable - 当前线程为:Thread-4
ThreadDemo1 - 异步执行完毕
DemoRunnable- 异步执行完毕
再聊聊线程池
什么是线程池
首先需要说明,线程池绝非 java 或者 SpringBoot 领域独有产物,几乎所有设计了线程的编程语言都有线程池这个东西。
线程池的核心思想就是“池化设计”,类似我们的数据库连接池、web服务器连接池等等。
将线程池化,便于管理和监控线程的生命周期、将线程的创建和线程执行的任务进行解耦。
在SpringBoot中,往往我们会提供至少 corePoolSize、queueCapacity、maximumPoolSize、RejectedExecutionHandler这四个元素来创建线程池,其中:
- corePoolSize 为应用启动完毕默认创建的线程数量、也叫做核心线程数量
- queueCapacity为队列容量,当核心线程都有正在处理的任务,则再进来的新线程请求将进入队列中排队等待
- maximumPoolSize 最大线程数,当队列满了的时候,在进来的新线程请求将直接创建新的线程,这时候会出现实际线程数量大于核心线程数量的情况,当请求足够的多,并且队列依然爆满,则继续直接创建新的线程,而这个上限就是最大线程数量了。达到最大线程数量之后,线程池将执行拒绝策略。
- RejectedExecutionHandler 拒绝策略,常见的有如下的策略:
- AbortPolicy
该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常 - DiscardPolicy 如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
- DiscardOldestPolicy 这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间
- CallerRunsPolicy 一旦线程池拒绝,主线程会自己去执行该任务
- 自定义拒绝策略,实现 RejectedExecutionHandler 接口,并重写rejectedExecution 方法
- AbortPolicy
线程池的好处
- 线程的创建统一管理
- 线程复用,避免频繁创建和销毁线程带来的性能损耗
- 线程更可控
- 解耦线程的创建和任务的执行
SpringBoot中创建线程池
需要说明的是,一个应用中可以创建多个线程池。
如下创建一个线程池配置类 ThreadPoolConfig ,在此类中新建了数个线程池。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
* 定义一个默认的线程池
*
* @return
*/
@Bean("defaultExecutor")
public Executor defaultExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("defaultExecutor-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
/**
* 可以根据不同的业务场景或者业务板块定义不同的线程池
*
* @return
*/
@Bean("anOtherExecutor")
public ThreadPoolTaskExecutor anOtherExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(10);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(20);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(500);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("anOtherExecutor-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
/**
* 固定大小的线程池
*
* @return
*/
@Bean("fixedExecutor")
public ThreadPoolExecutor fixedExecutor() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return executor;
}
@Bean("threadPoolExecutor")
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 100, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}
ThreadPoolExecutor和ThreadPoolTaskExecutor
- ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC
- ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理,实际对线程的管理还是依赖ThreadPoolExecutor
ThreadPoolTaskExecutor 源码:
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
......
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
......
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;
}
......
@Override
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
......
}
SpringBoot中实践
经过上面的铺垫,终于到了正文环节
使用@Async
@EnableAsync和@Async简介
加了@EnableAsync这个注解之后,SpringBoot程序就开启了异步调用的功能,配合@Async 注解可以让方法在异步线程中执行。
比如,Controller 调用了一个Service中的方法,如果此方法添加了@Async 注解,那么Controller将不等待Service执行完毕,就将结果响应给请求端了。
@Async使用方法
如下示例代码演示了@EnableAsync和@Async的使用方法。
controller
@GetMapping("/case7")
public void case7() throws InterruptedException {
System.out.println("controller-当前线程为:" + Thread.currentThread().getName());
String s = testService.case7();
System.out.println("case7返回值:" + s);
}
service
@Async
public String case7() throws InterruptedException {
System.out.println("service-case7,当前线程:" + Thread.currentThread().getName());
Thread.sleep(5000);
System.out.println("case7执行完毕了");
return "case7执行完毕了";
}
输出:
controller-当前线程为:http-nio-9009-exec-2
2023-06-28 14:19:18.132 INFO 20904 --- [nio-9009-exec-2] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
case7返回值:null
service-case7,当前线程:SimpleAsyncTaskExecutor-1
case7执行完毕了
从输出可以看到:
- controller和service确实不是一个线程
- 添加@Async注解后,该方法在新线程中执行
- 没有等到service执行完毕,controller已经得到了 null 结果,说明controller没有等待service
除此之外:
- 只能返回void或java.util.concurrent.Future
- @Async注解不仅可以放在方法上,还可以放在类上
- @Async注解的参数value可以设置线程池的名字,设置后,方法将在指定线程池中执行
- 在@Configuration类中声明的方法不支持@Async
获取异步方法返回值
当异步方法有返回值时,如何获取异步方法执行的返回结果呢?
这时需要异步调用的方法带有返回值CompletableFuture。CompletableFuture是对Feature的增强,Feature只能处理简单的异步任务,而CompletableFuture可以将多个异步任务进行复杂的组合。
@RestControllerpublic class AsyncController {
@Autowired
private AsyncService asyncService;
@SneakyThrows
@ApiOperation("异步 有返回值")
@GetMapping("/open/somethings")
public String somethings() {
CompletableFuture<String> createOrder = asyncService.doSomething1("create order");
CompletableFuture<String> reduceAccount = asyncService.doSomething2("reduce account");
CompletableFuture<String> saveLog = asyncService.doSomething3("save log");
// 等待所有任务都执行完
CompletableFuture.allOf(createOrder, reduceAccount, saveLog).join();
// 获取每个任务的返回结果
String result = createOrder.get() + reduceAccount.get() + saveLog.get();
return result;
}}
@Slf4j
@Service
public class AsyncService {
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething1(String message) throws InterruptedException {
log.info("do something1: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("do something1: " + message);
}
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething2(String message) throws InterruptedException {
log.info("do something2: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("; do something2: " + message);
}
@Async("doSomethingExecutor")
public CompletableFuture<String> doSomething3(String message) throws InterruptedException {
log.info("do something3: {}", message);
Thread.sleep(1000);
return CompletableFuture.completedFuture("; do something3: " + message);
}
}
- CompletableFuture 将多个异步任务通过 allOf()...join() 组合起来了
- 上述代码中Controller将会堵塞,直到最慢的一个子线程执行完毕才会继续执行
- CompletableFuture 是一个泛型类,意味着异步方法也可以有复杂的返回值结构
@Async实现原理
- 在SpringBoot中,注解意味着AOP,而AOP意味着代理类。所以@Async的实现原理就是基于代理类实现的
- 执行线程是如何创建的?
- 先根据类型从容器中寻找类型为 TaskExecutor 的类,即线程池配置
- 再根据名称从容器中寻找名称为 taskExecutor 的类,还是再找线程池配置
- 如果确实没有定义线程池,那么将实例化一个 Thread 对象来执行。
SimpleAsyncTaskExecutor 源码
......
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
public Thread createThread(Runnable runnable) {
Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
thread.setPriority(getThreadPriority());
thread.setDaemon(isDaemon());
return thread;
}
......
注意事项
失效
@Async注解会在以下几个场景失效:
- 异步方法使用static关键词修饰;
- 异步类不是一个Spring容器的bean;
- SpringBoot应用中没有添加@EnableAsync注解;
- 在同一个类中,一个方法调用另外一个有@Async注解的方法,注解不会生效。原因是@Async注解的方法,是在代理类中执行的。
返回值
需要注意的是: 异步方法使用注解@Async的返回值只能为void或者Future及其子类,当返回结果为其他类型时,方法还是会异步执行,但是返回值都是null
使用匿名类
如下代码演示了在service中直接实例化 Runnable 匿名类来处理异步任务
@Resource(name = "threadPoolExecutor")private ThreadPoolExecutor threadPoolExecutor;
public void case2() {
for (int i = 1; i < 21; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("当前线程名称:" + threadName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
当然也可以定义一个任务执行类,并将此类交由Executor执行
TestService:
private UserService userService;
public void case22() {
threadPoolExecutor.execute(new TestRunnable(userService, new TestRunnableParam().setId(222).setName("xiaoming")));
}
TestRunnable:
@AllArgsConstructor
public class TestRunnable implements Runnable {
private UserService userService;
private TestRunnableParam param;
@Override
public void run() {
System.out.println("从service传递过来的 UserService 为:" + userService.getUserName());
System.out.println("入参为:" + JSON.toJSONString(param));
//do something ......
}
}
使用Guava的MoreExecutors
MoreExecutors是一个多线程执行器的工具类,用来生成各种Executors类。比如生成一个可以随着jvm关闭而关闭的线程池、创建一个顺序执行的Executor等等。
通过向 listeningDecorator 装饰器中传递 ThreadPoolExecutor 就可以初始化一个 ListeningExecutorService ,ListeningExecutorService 可以为线程添加回调函数
如下代码演示了 ListeningExecutorService 的基本使用方法:
@Resource(name = "threadPoolExecutor")private ThreadPoolExecutor threadPoolExecutor;
public String case4() {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
for (int i = 0; i < 5; i++) {
ListenableFuture<String> submit = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep( 1000);
return "ok";
}
});
submit.addListener(new Runnable() {
@Override
public void run() {
log.info("线程回调:" + Thread.currentThread().getName());
}
}, executorService);
}
return null;
}
- 当线程执行完毕自行回调到回调函数中,主线程无需等待
聊聊CountDownLatch
CountDownLatch可以理解为一个工具,借助此工具可以实现主线程等待所有子线程执行完毕后在做其他操作的功能。
下面示例代码演示了使用方法:
CountDownLatch
// 初始化CountDownLatch
final CountDownLatch latch = new CountDownLatch(projectIds.size());
projectIds.forEach(it -> {
threadTaskExecutor.execute(() -> {
try {
// 业务代码
} catch (Exception e) {
// 异常信息捕获
} finally {
// 将计数器减1
latch.countDown();
}
});
});
// 等待计算器的值变为0,此处会发生堵塞
latch.await();
引用
- MoreExecutors: https://www.jianshu.com/p/0f615b6f1566