一、概述
Java中的异步编程是一种能够提高程序性能和响应速度的技术。它通过将耗时的操作放在单独的线程中,让主线程继续执行其他任务,从而实现并发处理和异步执行。在Java中,异步编程常用的方式有多线程、Future和CompletableFuture等。在实际应用中,异步编程可以优化网络请求、数据库操作等IO密集型任务的性能,提高程序的响应速度和吞吐量。虽然异步编程可以带来许多好处,但同时也涉及到一些问题,比如线程安全、回调地狱等。因此,在使用异步编程时需要注意合理地设计和管理线程,确保程序的正确性和可维护性。
本文主要总结几种常见的异步编程方案,包括:(1)基于Thread的多线程实现;(2)实现CompletableFuture接口;(3)通过@Async注解实现;(4)基于ApplicationEvent事件订阅;(5)基于消息中间件;
二、异步场景说明
异步编程是让程序并发运行的一种手段。它允许多个事件同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行。
他的核心思路是:采用多线程优化性能,将串行操作变成并行操作。异步模式设计的程序可以显著减少线程等待,从而在高吞吐量场景中,极大提升系统的整体性能,显著降低时延。
三、JDK原生方案
1.基于Thread的多线程
直接继承 Thread类 是创建异步线程最简单的方式。
首先,创建 Thread 子类,普通类或匿名内部类方式;然后创建子类实例;最后通过 start()方法启动线程。
public class ThreadTest implements Runnable{
public static void main(String[] args) {
Thread thread = new Thread(new ThreadTest());
thread.start();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
由于需要频繁的创建和销毁线程,这样的操作会对系统资源造成浪费,所以引入了线程池来管理多线性。以下代码的static ExecutorService executorService = Executors.newFixedThreadPool(3);
就是创建线程池的操作。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
上述方式虽然达到了多线程并行处理,但有些业务不仅仅要执行过程,还要获取执行后的结果。Java 从 1.5 版本开始,提供了 Callable 和 Future,可以在任务执行完毕之后得到任务执行结果。 当然也提供了其他功能,如:取消任务、查询任务是否完成等。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureTest {
static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
Future future = executorService.submit(
new Callable() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
Thread.sleep((long) (1000 + Math.random() * 1000));
return Thread.currentThread().getName();
}
}
);
System.out.println("threadName = " + future);
}
}
}
2.实现CompletableFuture接口
通过Future来获取异步线程的消息有个弊端是他是同步阻塞的,是需要主线程通过future.get()方法来查询起结果的,为了解决这个问题,CompletableFuture可以通过回调的方式来处理计算结果,实现了异步非阻塞,性能更优。CompletableFuture可以实现的功能包括:(1)任务执行成功后,会回调某个执行成功的方法;(2)任务执行失败后,会回调某个执行失败的方法;
import java.util.concurrent.CompletableFuture;
/**
* The type Completable futurn compose test.
*
* @author yangnk
*/
public class CompletableFuturnComposeTest {
/**
* Completable futurn compose.
*/
public static void completableFuturnCompose() {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
completableFuture.whenComplete((s, throwable) -> {
System.out.println(s);
});
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
completableFuture1.whenComplete((s, throwable) -> {
System.out.println(s);
});
CompletableFuture completableFuture2 = completableFuture1.thenApplyAsync(t ->{
return "after " + t;
});
completableFuture2.whenComplete((s, throwable) -> {
System.out.println(s);
});
CompletableFuture.allOf(completableFuture, completableFuture1).join();
System.out.println("1." + System.currentTimeMillis());
}
/**
* The entry point of application.
*
* @param args the input arguments
*/
public static void main(String[] args) {
completableFuturnCompose();
System.out.println("2." + System.currentTimeMillis());
}
}
四、集成Spring的方案
1.基于@Async注解
SpringBoot框架还提供了一种基于注解的方案,该方案以方法体为界,使方法体内部的代码逻辑全部按异步方式执行。为了启用异步注解,首先需要使用@EnableAsync。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class SpringDemo1Application {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringDemo1Application.class);
ConfigurableApplicationContext context = application.run(args);
}
}
然后再自定义线程池:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class TaskPoolConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
//返回可用处理器的Java虚拟机的数量 12
int i = Runtime.getRuntime().availableProcessors();
System.out.println("系统最大线程数 : " + i);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(16);
//最大线程数
executor.setMaxPoolSize(20);
//配置队列容量,默认值为Integer.MAX_VALUE
executor.setQueueCapacity(99999);
//活跃时间
executor.setKeepAliveSeconds(60);
//线程名字前缀
executor.setThreadNamePrefix("asyncServiceExecutor -");
//设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
executor.setAwaitTerminationSeconds(60);
//等待所有的任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
在异步处理的方法上,最后添加了注解 @Async。当调用 execute 方法时,通过自定义的线程池 taskExecutor,实现了对 execute 方法的异步化执行。这样可以提高程序的效率和性能。
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
interface AsyncService {
String sendSms(String callPrefix, String mobile, String actionType, String content);
String sendEmail(String email, String subject, String content);
}
@Slf4j
@Service
public class AsyncTest implements AsyncService {
@Override
@Async("taskExecutor")
public String sendSms(String callPrefix, String mobile, String actionType, String content) {
try {
Thread.sleep(1000);
System.out.println( "发送sms成功");
} catch (Exception e) {
log.error("发送短信异常 -> ", e);
}
return "发送sms成功";
}
@Override
@Async("taskExecutor")
public String sendEmail(String email, String subject, String content) {
try {
Thread.sleep(1000);
System.out.println( "发送email成功");
} catch (Exception e) {
log.error("发送email异常 -> ", e);
}
return "发送成功";
}
}
注意事项:由于通过 @Async注解来实现异步调用本质上是通过AOP代理实现的,代理类调用自身的方法会失效,所以加了 @Async注解的方法无法直接调用自己类中的方法,这样操作会调用失败。
总结一下在Springboot中基于@Async注解来实现异步调用:
- 调用异步方法类上或者启动类加上注解 @EnableAsync
- 在需要被异步调用的方法外加上 @Async
- 所使用的 @Async 注解方法的类对象应该是 Spring 容器管理的 bean 对象;
2.基于ApplicationEvent事件订阅
事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。本质上,这是标准的观察者设计模式。 ApplicationEvent 是由 Spring 提供的所有 Event 类的基类。
首先,需要自定义业务事件子类,继承自 ApplicationEvent,通过泛型注入业务模型参数类。相当于 MQ 的消息体。
import org.springframework.context.ApplicationEvent;
//自定义事件
public class ApplicationEventTest extends ApplicationEvent {
public ApplicationEventTest(Object source) {
super(source);
}
/**
* 事件处理事项
* @param msg
*/
public void printMsg(String msg)
{
System.out.println("监听到事件:" + ApplicationEventTest.class + ", 时间为:" + msg);
}
}
然后编写事件监听器。ApplicationListener接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener。相当于 MQ 的消费端。
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Date;
//自定义事件监听器
@Component
public class ApplicationListenerTest implements ApplicationListener {
@Override
public void onApplicationEvent(ApplicationEventTest event) {
event.printMsg(String.valueOf(new Date()));
}
}
最后发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class ApplicationEventPubTest implements ApplicationContextAware {
ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@RequestMapping("/hello1")
public String index() {
applicationContext.publishEvent(new ApplicationEventTest(new Object()));
return "Hello World.";
}
}
五、消息中间件实现
异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。
消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式和发布订阅模式两种。消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。 当然市面上消息队列框架非常多,常见的有 RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等。借助消息队列这个中间件可以高效的实现异步编程。
六、代码实现(未更新到博客)
参考资料
- 为什么都不建议直接使用 @Async 注解实现异步?:https://juejin.cn/post/7099328896142671903#heading-6
- Java实现异步编程的8种方式:https://juejin.cn/post/7165147306688249870 (主要参考)
- 一文带你彻底了解 Java 异步编程:https://xie.infoq.cn/article/37052813d58d4a3a41714c1d0
- 7 种 Java 异步编程实现方式!:https://zhuanlan.zhihu.com/p/572874448