在Java开发领域,异步编程是提升系统吞吐量、优化用户体验的核心手段之一。而Spring框架提供的@Async注解,更是让开发者无需深入了解复杂的线程池原理,就能轻松实现异步调用。但实际开发中,很多同学在使用@Async时会遇到“异步不生效”“线程池耗尽”“事务失效”等问题。本文将从实战出发,结合底层源码,全面拆解@Async注解的使用方法、核心原理、避坑要点,搭配可直接运行的实例,让你真正吃透异步编程的精髓。
一、@Async注解核心认知:什么是异步调用?为什么需要它?
1.1 同步VS异步:本质区别
在讲解@Async之前,我们先明确同步调用与异步调用的核心差异:
- 同步调用:方法A调用方法B后,必须等待方法B执行完毕并返回结果,A才能继续执行,整个过程是阻塞的。
- 异步调用:方法A调用方法B后,无需等待B执行完毕,A可以直接继续执行后续逻辑;而B会在独立的线程中异步执行。
1.2 异步调用的适用场景
异步调用适合处理“耗时且非核心流程”的操作,典型场景包括:
- 接口响应后的日志记录、数据统计(如用户登录后记录登录日志);
- 邮件/短信发送(无需等待发送结果返回给前端);
- 大文件导出、数据批量处理(避免阻塞主线程导致接口超时);
- 第三方接口调用(如调用支付回调接口,无需同步等待结果)。
1.3 @Async的核心作用
Spring的@Async注解基于AOP实现,通过动态代理机制,将被注解的方法封装到独立的线程中执行,从而实现异步调用。其核心价值在于:
- 简化异步编程:无需手动创建线程池、管理线程生命周期;
- 解耦线程管理与业务逻辑:开发者只需关注业务实现,线程池配置统一管理;
- 支持灵活配置:可自定义线程池参数、异常处理机制。
二、@Async基础使用:从环境搭建到第一个异步程序
2.1 环境依赖准备(Maven)
使用@Async需依赖Spring核心包,结合实战场景,我们搭建一个Spring Boot项目,核心依赖如下(所有版本采用最新稳定版):
<dependencies>
<!-- Spring Boot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.5</version>
</dependency>
<!-- Spring Boot测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.2.5</version>
<scope>test</scope>
</dependency>
<!-- Lombok:简化日志、Getter/Setter等 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- FastJSON2:JSON处理 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
<!-- Guava:集合工具类 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.1-jre</version>
</dependency>
<!-- MyBatis-Plus:持久层框架 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
<scope>runtime</scope>
</dependency>
<!-- Swagger3:接口文档 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
2.2 启用@Async:@EnableAsync注解
要让Spring识别@Async注解,必须在配置类或启动类上添加@EnableAsync注解,该注解的作用是开启Spring的异步方法支持,底层会注册异步方法处理器。
package com.jam.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 应用启动类
* 开启异步支持:@EnableAsync
* @author ken
*/
@SpringBootApplication
@EnableAsync
public class AsyncDemoApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApplication.class, args);
}
}
2.3 第一个异步程序:基础使用示例
2.3.1 异步服务接口与实现
定义异步服务接口,在实现类的方法上添加@Async注解,标记该方法为异步方法。
package com.jam.demo.service;
/**
* 异步服务接口
* @author ken
*/
public interface AsyncService {
/**
* 基础异步方法:无返回值
* @param taskName 任务名称
*/
void basicAsyncTask(String taskName);
/**
* 异步方法:有返回值(返回Future)
* @param taskName 任务名称
* @param sleepTime 模拟耗时时间(毫秒)
* @return 任务执行结果
*/
Future<String> asyncTaskWithReturn(String taskName, long sleepTime);
}
package com.jam.demo.service.impl;
import com.jam.demo.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
* 异步服务实现类
* @author ken
*/
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
/**
* 基础异步方法:无返回值
* @Async:标记该方法为异步方法,使用默认线程池
* @param taskName 任务名称
*/
@Override
@Async
public void basicAsyncTask(String taskName) {
log.info("【异步任务】{} 开始执行,当前线程:{}", taskName, Thread.currentThread().getName());
// 模拟耗时操作(如日志记录、邮件发送)
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("【异步任务】{} 执行异常", taskName, e);
Thread.currentThread().interrupt();
}
log.info("【异步任务】{} 执行完毕", taskName);
}
/**
* 异步方法:有返回值
* 注意:有返回值的异步方法必须返回Future或其实现类(如FutureTask)
* @param taskName 任务名称
* @param sleepTime 模拟耗时时间(毫秒)
* @return 任务执行结果
*/
@Override
@Async
public Future<String> asyncTaskWithReturn(String taskName, long sleepTime) {
log.info("【异步任务(有返回值)】{} 开始执行,当前线程:{},预计耗时:{}ms",
taskName, Thread.currentThread().getName(), sleepTime);
try {
Thread.sleep(sleepTime);
String result = taskName + " 执行成功";
return new FutureTask<>(() -> result);
} catch (InterruptedException e) {
log.error("【异步任务(有返回值)】{} 执行异常", taskName, e);
Thread.currentThread().interrupt();
return new FutureTask<>(() -> taskName + " 执行失败");
}
}
}
2.3.2 测试接口:验证异步效果
编写Controller层接口,调用异步服务方法,验证异步执行效果。
package com.jam.demo.controller;
import com.jam.demo.service.AsyncService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Future;
/**
* 异步测试控制器
* @author ken
*/
@RestController
@RequestMapping("/async")
@Slf4j
@Tag(name = "异步测试接口", description = "用于测试@Async注解的基础使用")
public class AsyncTestController {
@Autowired
private AsyncService asyncService;
/**
* 测试基础异步方法(无返回值)
* @param taskName 任务名称
* @return 接口响应
*/
@GetMapping("/basic")
@Operation(summary = "基础异步方法测试", description = "调用无返回值的异步方法,验证异步执行效果")
public String testBasicAsync(@RequestParam String taskName) {
log.info("【主线程】开始调用异步方法,当前线程:{}", Thread.currentThread().getName());
// 调用异步方法
asyncService.basicAsyncTask(taskName);
log.info("【主线程】异步方法调用完成,无需等待结果,直接返回响应");
return "异步任务已触发,可查看日志确认执行情况";
}
/**
* 测试有返回值的异步方法
* @param taskName 任务名称
* @param sleepTime 模拟耗时时间(毫秒)
* @return 任务执行结果
* @throws Exception 异常
*/
@GetMapping("/with-return")
@Operation(summary = "有返回值异步方法测试", description = "调用有返回值的异步方法,通过Future获取执行结果")
public String testAsyncWithReturn(@RequestParam String taskName, @RequestParam long sleepTime) throws Exception {
log.info("【主线程】开始调用有返回值的异步方法,当前线程:{}", Thread.currentThread().getName());
// 调用异步方法,获取Future对象
Future<String> future = asyncService.asyncTaskWithReturn(taskName, sleepTime);
log.info("【主线程】异步方法调用完成,可继续执行其他逻辑");
// 模拟主线程其他业务操作
log.info("【主线程】执行其他业务逻辑...");
Thread.sleep(1000);
// 通过Future.get()获取异步任务结果(会阻塞,直到任务完成)
log.info("【主线程】开始获取异步任务结果");
String result = future.get();
log.info("【主线程】异步任务结果:{}", result);
return "异步任务执行结果:" + result;
}
}
2.3.3 测试结果与分析
- 测试无返回值异步方法(/async/basic?taskName=测试任务1): 日志输出如下(关键观察线程名称和执行顺序):
【主线程】开始调用异步方法,当前线程:http-nio-8080-exec-1
【主线程】异步方法调用完成,无需等待结果,直接返回响应
【异步任务】测试任务1 开始执行,当前线程:SimpleAsyncTaskExecutor-1
【异步任务】测试任务1 执行完毕
- 结论:主线程(http-nio-8080-exec-1)调用异步方法后,无需等待异步任务完成,直接返回响应;异步任务在独立线程(SimpleAsyncTaskExecutor-1)中执行。
- 测试有返回值异步方法(/async/with-return?taskName=测试任务2&sleepTime=3000): 日志输出如下:
【主线程】开始调用有返回值的异步方法,当前线程:http-nio-8080-exec-2
【主线程】异步方法调用完成,可继续执行其他逻辑
【主线程】执行其他业务逻辑...
【异步任务(有返回值)】测试任务2 开始执行,当前线程:SimpleAsyncTaskExecutor-2,预计耗时:3000ms
【主线程】开始获取异步任务结果
【异步任务(有返回值)】测试任务2 执行完毕
【主线程】异步任务结果:测试任务2 执行成功
- 结论:主线程调用异步方法后先执行自身业务逻辑,直到调用
future.get()才会阻塞等待异步任务完成;异步任务在独立线程中执行。
三、@Async进阶使用:自定义线程池
3.1 为什么需要自定义线程池?
默认情况下,@Async使用的是Spring提供的SimpleAsyncTaskExecutor,该线程池的核心问题的是:每次执行异步任务都会创建一个新线程,不会复用线程,当异步任务量较大时,会导致系统创建大量线程,引发线程上下文切换频繁、内存占用过高甚至OOM问题。
因此,在生产环境中,必须自定义线程池,统一管理线程的创建、复用、销毁,合理配置线程池参数。
3.2 自定义线程池的3种方式
3.2.1 方式1:通过@Configuration+@Bean创建ThreadPoolTaskExecutor
这是最常用的方式,通过配置类创建ThreadPoolTaskExecutor(Spring封装的线程池,基于JDK的ThreadPoolExecutor),并指定线程池参数。
package com.jam.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 自定义线程池配置类
* @author ken
*/
@Configuration
public class AsyncThreadPoolConfig {
/**
* 自定义线程池:asyncTaskExecutor
* 核心参数说明:
* 1. corePoolSize:核心线程数(默认活跃的线程数)
* 2. maxPoolSize:最大线程数(线程池可创建的最大线程数)
* 3. queueCapacity:队列容量(核心线程满后,任务放入队列等待)
* 4. keepAliveSeconds:非核心线程空闲存活时间(超过该时间则销毁)
* 5. threadNamePrefix:线程名称前缀(便于日志排查)
* 6. rejectedExecutionHandler:拒绝策略(任务过多时的处理方式)
* @return 自定义线程池
*/
@Bean(name = "asyncTaskExecutor")
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:根据CPU核心数配置(一般为CPU核心数 * 2 + 1)
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量:核心线程满后,任务放入队列,队列满后才会创建非核心线程
executor.setQueueCapacity(25);
// 非核心线程空闲存活时间:30秒
executor.setKeepAliveSeconds(30);
// 线程名称前缀
executor.setThreadNamePrefix("AsyncTask-");
// 拒绝策略:当线程池、队列都满时,直接抛出异常(生产环境可根据需求调整)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 初始化线程池(必须调用,否则线程池无法生效)
executor.initialize();
return executor;
}
}
3.2.2 方式2:实现AsyncConfigurer接口
通过实现AsyncConfigurer接口,重写getAsyncExecutor()方法返回自定义线程池,同时可重写getAsyncUncaughtExceptionHandler()方法自定义异步任务异常处理器。
package com.jam.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 实现AsyncConfigurer接口自定义线程池
* @author ken
*/
@Configuration
@Slf4j
public class AsyncConfigurerPoolConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("AsyncConfigurerTask-");
// 拒绝策略:丢弃最老的任务,执行新任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
executor.initialize();
return executor;
}
/**
* 自定义异步任务异常处理器:处理异步方法中未捕获的异常
* @return 异常处理器
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.error("【异步任务异常】方法:{},参数:{},异常信息:{}",
method.getName(), params, ex.getMessage(), ex);
};
}
}
3.2.3 方式3:使用@Async的value属性指定线程池
当系统中有多个线程池时,可通过@Async("线程池bean名称")指定具体使用哪个线程池。
package com.jam.demo.service.impl;
import com.jam.demo.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
* 多线程池场景下的异步服务实现
* @author ken
*/
@Service
@Slf4j
public class MultiPoolAsyncServiceImpl implements AsyncService {
/**
* 使用指定线程池:asyncTaskExecutor
* @param taskName 任务名称
*/
@Override
@Async("asyncTaskExecutor")
public void basicAsyncTask(String taskName) {
log.info("【异步任务(指定线程池)】{} 开始执行,当前线程:{}", taskName, Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("【异步任务(指定线程池)】{} 执行异常", taskName, e);
Thread.currentThread().interrupt();
}
log.info("【异步任务(指定线程池)】{} 执行完毕", taskName);
}
/**
* 使用默认线程池(AsyncConfigurer配置的线程池)
* @param taskName 任务名称
* @param sleepTime 模拟耗时时间(毫秒)
* @return 任务执行结果
*/
@Override
@Async
public Future<String> asyncTaskWithReturn(String taskName, long sleepTime) {
log.info("【异步任务(默认线程池)】{} 开始执行,当前线程:{},预计耗时:{}ms",
taskName, Thread.currentThread().getName(), sleepTime);
try {
Thread.sleep(sleepTime);
String result = taskName + " 执行成功(默认线程池)";
return new FutureTask<>(() -> result);
} catch (InterruptedException e) {
log.error("【异步任务(默认线程池)】{} 执行异常", taskName, e);
Thread.currentThread().interrupt();
return new FutureTask<>(() -> taskName + " 执行失败(默认线程池)");
}
}
}
3.3 线程池参数配置最佳实践
线程池参数的配置直接影响系统性能,需根据业务场景合理调整,核心配置原则如下:
| 参数 | 配置建议 | 适用场景 |
| corePoolSize | CPU核心数 * 2 + 1(CPU密集型);CPU核心数 * 10(IO密集型) | CPU密集型:计算任务;IO密集型:数据库查询、文件操作 |
| maxPoolSize | 不超过CPU核心数 * 20(避免线程过多导致上下文切换频繁) | 高并发场景可适当增大 |
| queueCapacity | 核心线程数 * 5 ~ 核心线程数 * 10(避免队列过大导致任务堆积) | 任务执行时间短、数量多的场景 |
| keepAliveSeconds | 30 ~ 60秒(非核心线程空闲时及时销毁,节省资源) | 大多数场景通用 |
| 拒绝策略 | 核心业务:AbortPolicy(抛出异常,便于监控);非核心业务:DiscardOldestPolicy/DiscardPolicy | 核心业务需保证任务不丢失;非核心业务可丢弃旧任务 |
四、@Async底层原理:从注解解析到动态代理
要真正掌握@Async,必须理解其底层实现原理。@Async基于Spring AOP机制,通过动态代理为目标方法创建代理对象,将异步调用逻辑织入代理方法中。
4.1 核心执行流程
4.2 关键组件解析
- @EnableAsync:开启异步支持,核心是导入
AsyncConfigurationSelector,该类会根据Spring版本选择对应的异步配置类(如ProxyAsyncConfiguration)。 - AsyncAnnotationBeanPostProcessor:后置处理器,用于扫描带有@Async注解的方法,为目标类创建动态代理(JDK动态代理或CGLIB代理)。
- AsyncTaskExecutor:异步任务执行器,即线程池,是异步调用的核心载体。
- AnnotationAsyncExecutionInterceptor:异步方法拦截器,代理对象调用方法时会被该拦截器拦截,负责将任务提交到线程池。
4.3 动态代理机制详解
当我们调用被@Async注解的方法时,实际调用的是代理对象的方法,而非目标对象的原始方法。代理对象的核心逻辑如下:
- 拦截目标方法调用;
- 解析@Async注解的属性(如指定的线程池名称);
- 获取对应的线程池;
- 将目标方法的执行逻辑封装为
Callable或Runnable任务; - 将任务提交到线程池,由线程池中的线程执行;
- 主线程直接返回(无返回值)或返回
Future对象(有返回值)。
4.4 源码片段解析(关键逻辑)
以下是AnnotationAsyncExecutionInterceptor中拦截方法的核心源码(简化版),清晰展示了异步任务的提交过程:
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 1. 获取目标方法
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
// 2. 解析@Async注解,获取线程池
AsyncTaskExecutor executor = determineAsyncExecutor(specificMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set");
}
// 3. 将目标方法封装为Callable任务
Callable<Object> task = () -> {
try {
// 执行目标方法
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
} catch (Throwable ex) {
// 处理异常
handleError(ex, specificMethod, invocation.getArguments());
}
return null;
};
// 4. 提交任务到线程池,返回Future对象
return doSubmit(task, executor, specificMethod);
}
五、@Async实战进阶:事务处理、异常处理与批量异步
5.1 异步方法与事务的关系
核心结论:@Async注解的方法与事务注解(@Transactional)同时使用时,事务不会生效。原因如下:
- 事务基于Spring AOP,需要通过代理对象调用才能生效;
- @Async的动态代理会将方法提交到线程池执行,此时目标方法的调用脱离了事务代理的上下文,事务注解无法被识别。
5.1.1 错误示例(事务不生效)
package com.jam.demo.service.impl;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import com.jam.demo.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 错误示例:异步方法+事务(事务不生效)
* @author ken
*/
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
/**
* 错误示例:@Async与@Transactional同时使用,事务不生效
* 原因:异步方法在独立线程执行,脱离了事务代理上下文
* @param user 用户信息
*/
@Override
@Async
@Transactional(rollbackFor = Exception.class)
public void asyncSaveUser(User user) {
userMapper.insert(user);
// 模拟异常
int i = 1 / 0;
}
}
5.1.2 正确示例(事务生效的异步方案)
要实现异步方法的事务控制,需将事务逻辑抽离到独立的服务方法中,由异步方法调用该事务方法(确保事务方法被代理对象调用)。
package com.jam.demo.service.impl;
import com.jam.demo.entity.User;
import com.jam.demo.mapper.UserMapper;
import com.jam.demo.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 正确示例:异步方法+事务(事务生效)
* 方案:将事务逻辑抽离到独立方法,异步方法调用事务方法
* @author ken
*/
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
/**
* 异步方法:仅负责触发异步执行,不包含事务逻辑
* @param user 用户信息
*/
@Override
@Async("asyncTaskExecutor")
public void asyncSaveUser(User user) {
log.info("【异步任务】开始执行用户保存,当前线程:{}", Thread.currentThread().getName());
// 调用事务方法
doSaveUser(user);
log.info("【异步任务】用户保存执行完毕");
}
/**
* 事务方法:独立的事务逻辑,由Spring代理对象调用
* @param user 用户信息
*/
@Transactional(rollbackFor = Exception.class)
public void doSaveUser(User user) {
userMapper.insert(user);
// 模拟异常,事务会回滚
int i = 1 / 0;
}
}
5.2 异步方法的异常处理
异步方法中如果发生未捕获的异常,由于线程是独立的,主线程无法感知,会导致异常丢失。因此,必须配置异常处理机制。
5.2.1 方式1:实现AsyncUncaughtExceptionHandler(全局异常处理)
如3.2.2节所示,通过实现AsyncConfigurer接口的getAsyncUncaughtExceptionHandler()方法,配置全局异步异常处理器,处理所有无返回值异步方法的未捕获异常。
5.2.2 方式2:通过Future获取异常(有返回值方法)
有返回值的异步方法会返回Future对象,调用Future.get()方法时,会将异步方法中的异常抛出,可通过try-catch捕获。
// 示例:捕获有返回值异步方法的异常
@GetMapping("/with-return-exception")
@Operation(summary = "有返回值异步方法异常处理测试")
public String testAsyncWithReturnException() {
log.info("【主线程】开始调用有返回值的异步方法");
Future<String> future = asyncService.asyncTaskWithReturn("测试异常任务", 2000);
try {
String result = future.get();
return result;
} catch (Exception e) {
log.error("【主线程】捕获异步任务异常", e);
return "异步任务执行失败:" + e.getMessage();
}
}
5.2.3 方式3:自定义异常处理器(局部异常处理)
通过@Async的exceptionHandler属性指定局部异常处理器(需Spring 4.1+版本支持)。
package com.jam.demo.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 自定义异步异常处理器(局部)
* @author ken
*/
@Component
@Slf4j
public class CustomAsyncExceptionHandler {
/**
* 处理异步方法异常
* @param ex 异常对象
* @param method 方法对象
* @param params 方法参数
*/
public void handleException(Throwable ex, String method, Object... params) {
log.error("【自定义异步异常】方法:{},参数:{},异常信息:{}", method, params, ex.getMessage(), ex);
}
}
// 使用局部异常处理器
@Async(exceptionHandler = "customAsyncExceptionHandler")
public void asyncTaskWithCustomExceptionHandler(String taskName) {
log.info("【异步任务(自定义异常处理器)】{} 开始执行", taskName);
// 模拟异常
int i = 1 / 0;
}
5.3 批量异步任务处理
在实际开发中,经常需要批量执行异步任务(如批量发送短信、批量处理数据),此时可通过CompletableFuture实现批量任务的并发执行、结果聚合、异常捕获。
5.3.1 批量异步任务示例(基于CompletableFuture)
package com.jam.demo.service.impl;
import com.jam.demo.service.BatchAsyncService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 批量异步任务服务实现
* @author ken
*/
@Service
@Slf4j
public class BatchAsyncServiceImpl implements BatchAsyncService {
/**
* 单个批量任务的异步方法
* @param taskId 任务ID
* @return CompletableFuture<String> 任务执行结果
*/
@Async("asyncTaskExecutor")
public CompletableFuture<String> batchTask(Integer taskId) {
log.info("【批量异步任务】任务{} 开始执行,当前线程:{}", taskId, Thread.currentThread().getName());
try {
// 模拟耗时操作(如处理单条数据)
Thread.sleep(1000);
String result = "任务" + taskId + " 执行成功";
return CompletableFuture.completedFuture(result);
} catch (InterruptedException e) {
log.error("【批量异步任务】任务{} 执行异常", taskId, e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
}
/**
* 批量执行异步任务,聚合结果
* @param taskCount 任务数量
* @return 所有任务的执行结果
*/
@Override
public CompletableFuture<List<String>> executeBatchTasks(Integer taskCount) {
// 生成任务列表
List<Integer> taskIds = Lists.newArrayList();
for (int i = 1; i <= taskCount; i++) {
taskIds.add(i);
}
// 批量提交异步任务,获取CompletableFuture列表
List<CompletableFuture<String>> futureList = taskIds.stream()
.map(this::batchTask)
.toList();
// 聚合所有任务结果:当所有任务完成后,收集结果
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(v -> futureList.stream()
.map(CompletableFuture::join)
.toList());
}
}
5.3.2 测试批量异步任务
@GetMapping("/batch")
@Operation(summary = "批量异步任务测试", description = "批量执行异步任务,聚合所有任务结果")
public CompletableFuture<String> testBatchAsyncTasks(@RequestParam Integer taskCount) {
log.info("【主线程】开始批量提交异步任务,任务数量:{}", taskCount);
CompletableFuture<List<String>> batchFuture = batchAsyncService.executeBatchTasks(taskCount);
return batchFuture.thenApply(results -> {
log.info("【主线程】所有批量异步任务执行完毕,结果:{}", results);
return "批量任务执行完成,共" + results.size() + "个任务,结果:" + results;
});
}
六、@Async常见坑与避坑指南
6.1 坑1:异步方法不生效
6.1.1 常见原因
- 未添加
@EnableAsync注解; - 异步方法为private修饰(Spring AOP无法拦截private方法);
- 异步方法被同一个类中的其他方法调用(内部调用,未经过代理对象);
- 自定义线程池未调用
initialize()方法(线程池未初始化); - 依赖注入的是目标对象而非代理对象(如使用new关键字创建对象)。
6.1.2 避坑方案
- 确保启动类或配置类上添加
@EnableAsync; - 异步方法必须为public修饰;
- 避免内部调用:异步方法和调用方必须在不同的类中;
- 自定义线程池时,必须调用
executor.initialize(); - 依赖注入使用
@Autowired或@Resource,避免使用new关键字创建对象。
6.1.3 错误示例与正确示例
// 错误示例1:内部调用(异步不生效)
@Service
public class AsyncErrorService {
// 内部调用异步方法,未经过代理对象
public void callAsyncMethod() {
asyncMethod();
}
@Async
public void asyncMethod() {
// 异步逻辑
}
}
// 错误示例2:private修饰(异步不生效)
@Service
public class AsyncErrorService {
@Async
private void asyncMethod() {
// 异步逻辑
}
}
// 正确示例:不同类调用(异步生效)
@Service
public class AsyncCallerService {
@Autowired
private AsyncService asyncService;
// 调用不同类中的异步方法,经过代理对象
public void callAsyncMethod() {
asyncService.basicAsyncTask("正确示例任务");
}
}
6.2 坑2:线程池耗尽
6.2.1 常见原因
- 使用默认线程池
SimpleAsyncTaskExecutor(每次创建新线程,无上限); - 线程池参数配置不合理(核心线程数、最大线程数过小,队列容量过小);
- 异步任务执行时间过长,导致线程被长时间占用;
- 任务提交速度超过线程池处理速度,导致任务堆积、线程池满负荷。
6.2.2 避坑方案
- 生产环境禁用默认线程池,必须自定义线程池;
- 根据业务场景合理配置线程池参数(参考3.3节最佳实践);
- 监控异步任务执行时间,优化耗时任务(如拆分大任务、优化SQL);
- 配置合理的拒绝策略,避免任务过多时系统崩溃;
- 对异步任务进行限流,避免短时间内提交大量任务。
6.3 坑3:事务不生效
6.3.1 常见原因
如5.1节所述,@Async与@Transactional同时使用时,异步方法脱离了事务代理上下文,导致事务不生效。
6.3.2 避坑方案
将事务逻辑抽离到独立的public方法中,由异步方法调用该事务方法(确保事务方法被代理对象调用),具体示例参考5.1.2节。
6.4 坑4:异常丢失
6.4.1 常见原因
无返回值的异步方法中发生未捕获的异常,由于线程独立,主线程无法感知,导致异常丢失。
6.4.2 避坑方案
配置全局或局部异常处理器(参考5.2节),确保所有异步方法的异常都能被捕获和记录。
6.5 坑5:异步方法返回值错误
6.5.1 常见原因
有返回值的异步方法未返回Future或其实现类(如直接返回String、Integer等基本类型),导致无法获取异步执行结果。
6.5.2 避坑方案
有返回值的异步方法必须返回Future或其实现类(如FutureTask、CompletableFuture),通过Future.get()获取执行结果。
// 错误示例:返回值不是Future(无法获取异步结果)
@Async
public String asyncTaskWithWrongReturn(String taskName) {
return taskName + " 执行成功";
}
// 正确示例:返回Future(可获取异步结果)
@Async
public Future<String> asyncTaskWithCorrectReturn(String taskName) {
return new FutureTask<>(() -> taskName + " 执行成功");
}
七、@Async实战案例:用户注册异步通知系统
7.1 案例需求
用户注册成功后,需要执行以下3个异步任务:
- 发送注册成功短信;
- 发送注册成功邮件;
- 记录用户注册日志到数据库。
要求:3个任务并发执行,提升注册接口响应速度;确保每个任务的异常都能被捕获;记录日志的任务需要事务支持(确保日志数据入库成功)。
7.2 案例实现
7.2.1 数据库表设计(MySQL)
-- 用户表
CREATE TABLE `user` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`username` varchar(50) NOT NULL COMMENT '用户名',
`phone` varchar(20) NOT NULL COMMENT '手机号',
`email` varchar(100) NOT NULL COMMENT '邮箱',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';
-- 注册日志表
CREATE TABLE `user_register_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`register_ip` varchar(50) NOT NULL COMMENT '注册IP',
`log_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '日志时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户注册日志表';
7.2.2 实体类
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户实体类
* @author ken
*/
@Data
@TableName("user")
public class User {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户名
*/
private String username;
/**
* 手机号
*/
private String phone;
/**
* 邮箱
*/
private String email;
/**
* 创建时间
*/
private LocalDateTime createTime;
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户注册日志实体类
* @author ken
*/
@Data
@TableName("user_register_log")
public class UserRegisterLog {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户ID
*/
private Long userId;
/**
* 注册IP
*/
private String registerIp;
/**
* 日志时间
*/
private LocalDateTime logTime;
}
7.2.3 Mapper层
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.User;
import org.springframework.stereotype.Repository;
/**
* 用户Mapper
* @author ken
*/
@Repository
public interface UserMapper extends BaseMapper<User> {
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.UserRegisterLog;
import org.springframework.stereotype.Repository;
/**
* 注册日志Mapper
* @author ken
*/
@Repository
public interface UserRegisterLogMapper extends BaseMapper<UserRegisterLog> {
}
7.2.4 服务层
package com.jam.demo.service;
import com.jam.demo.entity.User;
import com.jam.demo.entity.UserRegisterLog;
/**
* 注册相关服务
* @author ken
*/
public interface RegisterService {
/**
* 用户注册(同步方法:保存用户信息)
* @param user 用户信息
* @param registerIp 注册IP
* @return 注册成功的用户ID
*/
Long userRegister(User user, String registerIp);
/**
* 发送注册成功短信(异步方法)
* @param phone 手机号
*/
void sendRegisterSms(String phone);
/**
* 发送注册成功邮件(异步方法)
* @param email 邮箱
*/
void sendRegisterEmail(String email);
/**
* 记录注册日志(异步+事务方法)
* @param userId 用户ID
* @param registerIp 注册IP
*/
void recordRegisterLog(Long userId, String registerIp);
}
package com.jam.demo.service.impl;
import com.jam.demo.entity.User;
import com.jam.demo.entity.UserRegisterLog;
import com.jam.demo.mapper.UserMapper;
import com.jam.demo.mapper.UserRegisterLogMapper;
import com.jam.demo.service.RegisterService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* 注册相关服务实现
* @author ken
*/
@Service
@Slf4j
public class RegisterServiceImpl implements RegisterService {
@Autowired
private UserMapper userMapper;
@Autowired
private UserRegisterLogMapper userRegisterLogMapper;
/**
* 用户注册(同步方法:保存用户信息)
* 注:用户注册是核心流程,需同步执行,确保用户信息入库成功后再触发异步任务
* @param user 用户信息
* @param registerIp 注册IP
* @return 注册成功的用户ID
*/
@Override
public Long userRegister(User user, String registerIp) {
log.info("【用户注册】开始保存用户信息,用户名:{}", user.getUsername());
// 保存用户信息
user.setCreateTime(LocalDateTime.now());
userMapper.insert(user);
Long userId = user.getId();
log.info("【用户注册】用户信息保存成功,用户ID:{}", userId);
// 触发3个异步任务(并发执行)
sendRegisterSms(user.getPhone());
sendRegisterEmail(user.getEmail());
recordRegisterLog(userId, registerIp);
return userId;
}
/**
* 发送注册成功短信(异步方法)
* @param phone 手机号
*/
@Override
@Async("asyncTaskExecutor")
public void sendRegisterSms(String phone) {
log.info("【异步任务-发送短信】开始向手机号:{} 发送注册成功短信,当前线程:{}",
phone, Thread.currentThread().getName());
// 模拟短信发送耗时
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
log.error("【异步任务-发送短信】向手机号:{} 发送短信异常", phone, e);
Thread.currentThread().interrupt();
}
log.info("【异步任务-发送短信】向手机号:{} 发送短信成功", phone);
}
/**
* 发送注册成功邮件(异步方法)
* @param email 邮箱
*/
@Override
@Async("asyncTaskExecutor")
public void sendRegisterEmail(String email) {
log.info("【异步任务-发送邮件】开始向邮箱:{} 发送注册成功邮件,当前线程:{}",
email, Thread.currentThread().getName());
// 模拟邮件发送耗时
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error("【异步任务-发送邮件】向邮箱:{} 发送邮件异常", email, e);
Thread.currentThread().interrupt();
}
log.info("【异步任务-发送邮件】向邮箱:{} 发送邮件成功", email);
}
/**
* 记录注册日志(异步+事务方法)
* 注:事务逻辑抽离到独立方法,确保事务生效
* @param userId 用户ID
* @param registerIp 注册IP
*/
@Override
@Async("asyncTaskExecutor")
public void recordRegisterLog(Long userId, String registerIp) {
log.info("【异步任务-记录日志】开始记录用户:{} 的注册日志,当前线程:{}",
userId, Thread.currentThread().getName());
doRecordRegisterLog(userId, registerIp);
log.info("【异步任务-记录日志】用户:{} 的注册日志记录成功", userId);
}
/**
* 事务方法:实际执行日志记录逻辑
* @param userId 用户ID
* @param registerIp 注册IP
*/
@Transactional(rollbackFor = Exception.class)
public void doRecordRegisterLog(Long userId, String registerIp) {
UserRegisterLog log = new UserRegisterLog();
log.setUserId(userId);
log.setRegisterIp(registerIp);
log.setLogTime(LocalDateTime.now());
userRegisterLogMapper.insert(log);
// 模拟异常(测试事务回滚)
// int i = 1 / 0;
}
}
7.2.5 控制器层
package com.jam.demo.controller;
import com.jam.demo.entity.User;
import com.jam.demo.service.RegisterService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.util.StringUtils;
/**
* 用户注册控制器
* @author ken
*/
@RestController
@RequestMapping("/register")
@Slf4j
@Tag(name = "用户注册接口", description = "用户注册核心接口,包含异步通知功能")
public class RegisterController {
@Autowired
private RegisterService registerService;
/**
* 用户注册接口
* @param user 用户信息
* @param registerIp 注册IP
* @return 注册结果
*/
@PostMapping
@Operation(summary = "用户注册", description = "用户注册成功后,异步发送短信、邮件,记录注册日志")
public String userRegister(@RequestBody User user, @RequestParam String registerIp) {
// 参数校验
if (ObjectUtils.isEmpty(user) || !StringUtils.hasText(user.getUsername())
|| !StringUtils.hasText(user.getPhone()) || !StringUtils.hasText(user.getEmail())) {
return "参数错误:用户名、手机号、邮箱不能为空";
}
if (!StringUtils.hasText(registerIp)) {
return "参数错误:注册IP不能为空";
}
log.info("【用户注册接口】开始处理注册请求,用户名:{}", user.getUsername());
// 执行注册(同步),触发异步任务
Long userId = registerService.userRegister(user, registerIp);
log.info("【用户注册接口】注册请求处理完成,用户ID:{},已触发异步通知任务", userId);
return "注册成功,用户ID:" + userId;
}
}
7.3 案例测试与结果分析
发送POST请求到/register,请求参数如下:
{
"username": "test_user",
"phone": "13800138000",
"email": "test@example.com"
}
请求参数registerIp:127.0.0.1
日志输出如下(关键观察线程名称和执行顺序):
【用户注册接口】开始处理注册请求,用户名:test_user
【用户注册】开始保存用户信息,用户名:test_user
【用户注册】用户信息保存成功,用户ID:1
【异步任务-发送短信】开始向手机号:13800138000 发送注册成功短信,当前线程:AsyncTask-1
【异步任务-发送邮件】开始向邮箱:test@example.com 发送注册成功邮件,当前线程:AsyncTask-2
【异步任务-记录日志】开始记录用户:1 的注册日志,当前线程:AsyncTask-3
【用户注册接口】注册请求处理完成,用户ID:1,已触发异步通知任务
【异步任务-发送短信】向手机号:13800138000 发送短信成功
【异步任务-记录日志】用户:1 的注册日志记录成功
【异步任务-发送邮件】向邮箱:test@example.com 发送邮件成功
结果分析:
- 核心流程同步执行:用户信息保存是核心流程,同步执行确保用户注册成功后才触发后续异步任务;
- 异步任务并发执行:发送短信、发送邮件、记录日志3个任务在3个独立线程(AsyncTask-1、AsyncTask-2、AsyncTask-3)中并发执行,无需顺序等待;
- 接口响应快速:主线程(用户注册接口)在触发异步任务后立即返回响应,无需等待异步任务完成,提升了接口响应速度;
- 异常隔离:每个异步任务的异常都被独立捕获,不会影响其他任务和主线程的执行。
八、@Async监控与调优
8.1 异步任务监控
在生产环境中,需要对异步任务的执行状态、线程池状态进行监控,以便及时发现问题。常用监控方案如下:
8.1.1 基于Spring Boot Actuator监控线程池
Spring Boot Actuator提供了线程池监控端点,可通过配置暴露线程池相关指标。
- 添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>3.2.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.12.5</version>
</dependency>
- 配置application.yml:
spring:
application:
name: async-demo
management:
endpoints:
web:
exposure:
include: health,info,metrics,threadpool # 暴露线程池监控端点
metrics:
tags:
application: ${spring.application.name}
export:
prometheus:
enabled: true # 启用Prometheus导出指标
- 自定义线程池监控指标:
package com.jam.demo.config;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池监控配置
* @author ken
*/
@Configuration
public class ThreadPoolMonitorConfig {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
@Qualifier("asyncTaskExecutor")
private ThreadPoolTaskExecutor asyncTaskExecutor;
/**
* 注册线程池监控指标
*/
@PostConstruct
public void monitorThreadPool() {
ThreadPoolExecutor executor = asyncTaskExecutor.getThreadPoolExecutor();
// 核心线程数
Gauge.builder("threadpool.core.size", executor, ThreadPoolExecutor::getCorePoolSize)
.tag("threadpool.name", "asyncTaskExecutor")
.register(meterRegistry);
// 活跃线程数
Gauge.builder("threadpool.active.size", executor, ThreadPoolExecutor::getActiveCount)
.tag("threadpool.name", "asyncTaskExecutor")
.register(meterRegistry);
// 最大线程数
Gauge.builder("threadpool.max.size", executor, ThreadPoolExecutor::getMaximumPoolSize)
.tag("threadpool.name", "asyncTaskExecutor")
.register(meterRegistry);
// 队列中的任务数
Gauge.builder("threadpool.queue.size", executor, e -> e.getQueue().size())
.tag("threadpool.name", "asyncTaskExecutor")
.register(meterRegistry);
// 已完成的任务数
Gauge.builder("threadpool.completed.tasks", executor, ThreadPoolExecutor::getCompletedTaskCount)
.tag("threadpool.name", "asyncTaskExecutor")
.register(meterRegistry);
}
}
- 访问监控端点:
- 查看线程池指标:
http://localhost:8080/actuator/metrics/threadpool.active.size?tag=threadpool.name:asyncTaskExecutor - 查看Prometheus指标:
http://localhost:8080/actuator/prometheus(可结合Grafana可视化展示)
8.1.2 自定义异步任务执行日志
通过AOP切面,记录异步任务的执行时间、参数、结果等信息,便于问题排查。
package com.jam.demo.aspect;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
/**
* 异步任务执行日志切面
* @author ken
*/
@Aspect
@Component
@Slf4j
public class AsyncTaskLogAspect {
/**
* 切入点:所有被@Async注解的方法
*/
@Pointcut("@annotation(org.springframework.scheduling.annotation.Async)")
public void asyncTaskPointcut() {}
/**
* 环绕通知:记录任务执行时间、参数、结果
* @param joinPoint 连接点
* @return 任务执行结果
* @throws Throwable 异常
*/
@Around("asyncTaskPointcut()")
public Object aroundAsyncTask(ProceedingJoinPoint joinPoint) throws Throwable {
// 记录开始时间
long startTime = System.currentTimeMillis();
String methodName = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
log.info("【异步任务监控】{} 开始执行,参数:{},当前线程:{}",
methodName, args, Thread.currentThread().getName());
try {
// 执行目标方法
Object result = joinPoint.proceed();
// 记录执行时间和结果
long costTime = System.currentTimeMillis() - startTime;
log.info("【异步任务监控】{} 执行完成,耗时:{}ms,结果:{}",
methodName, costTime, result);
return result;
} catch (Throwable e) {
log.error("【异步任务监控】{} 执行异常,耗时:{}ms",
methodName, System.currentTimeMillis() - startTime, e);
throw e;
}
}
}
8.2 异步任务调优
8.2.1 线程池参数动态调优
在生产环境中,线程池参数可能需要根据业务流量动态调整,可通过以下方案实现:
- 基于配置中心(如Nacos、Apollo)动态刷新线程池参数;
- 提供接口手动调整线程池参数(需做好权限控制)。
示例:基于Nacos动态调整线程池参数
package com.jam.demo.config;
import com.alibaba.nacos.api.config.annotation.NacosConfigListener;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
/**
* 基于Nacos的线程池动态配置
* @author ken
*/
@Configuration
public class DynamicThreadPoolConfig {
@Resource
@Qualifier("asyncTaskExecutor")
private ThreadPoolTaskExecutor asyncTaskExecutor;
// 从Nacos获取核心线程数配置
@NacosValue(value = "${threadpool.async.core-size:5}", autoRefreshed = true)
private int corePoolSize;
// 从Nacos获取最大线程数配置
@NacosValue(value = "${threadpool.async.max-size:10}", autoRefreshed = true)
private int maxPoolSize;
// 从Nacos获取空闲存活时间配置
@NacosValue(value = "${threadpool.async.keep-alive-seconds:30}", autoRefreshed = true)
private int keepAliveSeconds;
/**
* 监听配置变化,动态调整线程池参数
*/
@NacosConfigListener(dataId = "async-demo-threadpool-config", groupId = "DEFAULT_GROUP")
public void refreshThreadPoolConfig(String config) {
// 解析配置(此处简化,实际需解析JSON/Properties格式)
// 动态调整核心线程数
asyncTaskExecutor.setCorePoolSize(corePoolSize);
// 动态调整最大线程数
asyncTaskExecutor.setMaxPoolSize(maxPoolSize);
// 动态调整空闲存活时间
asyncTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
// 重新初始化线程池(仅调整核心线程数、最大线程数、空闲存活时间时无需重新初始化)
asyncTaskExecutor.initialize();
}
}
8.2.2 任务拆分与合并
对于执行时间过长的大任务,可拆分为多个小任务并发执行,提升执行效率;对于大量小任务,可合并为批次任务执行,减少线程切换开销。
示例:大任务拆分
/**
* 大任务拆分示例:批量处理1000条数据,拆分为10个小任务,每个任务处理100条
* @param dataList 待处理数据列表
* @return 处理结果
*/
@Async("asyncTaskExecutor")
public CompletableFuture<Void> processLargeData(List<String> dataList) {
// 拆分任务:每100条数据为一个子任务
List<List<String>> subTasks = Lists.partition(dataList, 100);
// 并发执行子任务
List<CompletableFuture<Void>> futureList = subTasks.stream()
.map(subList -> CompletableFuture.runAsync(() -> processSubTask(subList), asyncTaskExecutor))
.toList();
// 等待所有子任务完成
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
}
/**
* 子任务处理逻辑
* @param subList 子任务数据列表
*/
private void processSubTask(List<String> subList) {
log.info("【子任务】开始处理 {} 条数据,当前线程:{}", subList.size(), Thread.currentThread().getName());
// 处理逻辑
subList.forEach(data -> {
// 数据处理操作
});
log.info("【子任务】数据处理完成");
}
8.2.3 避免异步任务嵌套
异步任务内部尽量不要嵌套调用其他异步任务,否则会导致线程池资源被过度占用,增加系统复杂度和排查难度。若必须嵌套,需严格控制嵌套层级和任务数量。
九、总结与核心要点回顾
@Async注解是Spring框架中实现异步编程的核心工具,其使用简单,但要在生产环境中稳定运行,需掌握以下核心要点:
- 基础使用:必须添加
@EnableAsync开启异步支持,异步方法需为public修饰,避免内部调用; - 线程池配置:生产环境禁用默认线程池,自定义线程池需合理配置核心参数,调用
initialize()初始化; - 事务处理:@Async与@Transactional同时使用时事务不生效,需将事务逻辑抽离到独立方法;
- 异常处理:通过
AsyncUncaughtExceptionHandler或Future捕获异常,避免异常丢失; - 避坑指南:重点关注异步不生效、线程池耗尽、事务失效、异常丢失等常见问题;
- 监控调优:通过Actuator、自定义日志实现监控,结合配置中心实现动态调优,合理拆分/合并任务提升效率。
通过本文的讲解和实战示例,相信你已全面掌握@Async注解的使用方法和底层逻辑。在实际开发中,需结合业务场景灵活运用异步编程,平衡系统吞吐量和稳定性,让异步成为提升系统性能的利器。