背景
异步编程是一种解决并发问题的方式,它允许程序在执行某个任务时,不需要等待该任务完成,而是可以继续执行后续的任务。通过异步编程,我们可以将长时间运行的任务交给其他线程或进程来处理,从而提高程序的性能和响应性。
异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件、异步更新等,这些都是典型的可以通过异步实现的场景。
过程
异步实现的方式
1、线程Thread
2、Future
3、异步框架CompletableFuture
4、消息队列
什么是异步
在同步操作中,我们执行到 发送短信 的时候,我们必须等待这个方法彻底执行完才能执行 赠送积分 这个操作,如果 赠送积分 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。
实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分和发送短信这两个操作能够同时进行,比如:
这就是所谓的异步,是不是非常简单,下面就说说异步的几种实现方式吧。
详解异步的几种方式
1、线程Thread
package asynchronous.Eight; public class AsyncThread1 { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); sendSMS(); sendPoints(); long endTime = System.currentTimeMillis(); System.out.println("--costTime"+(endTime-startTime)+"ms"); } // public static void main(String[] args) { // // long startTime = System.currentTimeMillis(); // // 创建并启动两个线程 // Thread smsThread = new Thread(() -> { // try { // sendSMS(); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // }); // Thread pointsThread = new Thread(() -> { // try { // sendPoints(); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } // }); // // smsThread.start(); // pointsThread.start(); // // try { // // 等待两个线程执行结束 // smsThread.join(); // pointsThread.join(); // } catch (InterruptedException e) { // e.printStackTrace(); // } // // long endTime = System.currentTimeMillis(); // System.out.println("All tasks are completed."); // System.out.println("--costTime"+(endTime-startTime)+"ms"); // // } // 假设以下是发送短信和赠送积分的方法 public static void sendSMS() throws InterruptedException { // 发送短信的逻辑 Thread.sleep(300); System.out.println("Sending SMS..."); } public static void sendPoints() throws InterruptedException { // 赠送积分的逻辑 Thread.sleep(300); System.out.println("Sending Points..."); } }
2、Future
package asynchronous.Eight; import java.util.concurrent.*; public class FutureManager2 { public static void main(String[] args) { long startTime = System.currentTimeMillis(); ExecutorService executor = Executors.newFixedThreadPool(2); // 提交两个任务并获取Future对象 Future<Void> smsFuture = executor.submit(() -> { sendSMS(); return null; }); Future<Void> pointsFuture = executor.submit(() -> { sendPoints(); return null; }); try { // 等待两个任务执行结束 smsFuture.get(); pointsFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { // 关闭线程池 executor.shutdown(); } System.out.println("All tasks are completed."); long endTime = System.currentTimeMillis(); System.out.println("--costTime"+(endTime-startTime)+"ms"); } // 假设以下是发送短信和赠送积分的方法 public static void sendSMS() throws InterruptedException { // 发送短信的逻辑 Thread.sleep(300); System.out.println("Sending SMS..."); } public static void sendPoints() throws InterruptedException { Thread.sleep(300); // 赠送积分的逻辑 System.out.println("Sending Points..."); } }
Future的不足之处的包括以下几点:
1️⃣ 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
2️⃣ Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。
3️⃣ Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。
3、异步框架CompleteableFuture
package asynchronous.Eight; import java.util.concurrent.CompletableFuture; public class CompletableFutureCompose3 { public static void main(String[] args) { long startTime = System.currentTimeMillis(); // 创建并执行两个CompletableFuture任务 CompletableFuture<Void> smsFuture = CompletableFuture.runAsync(() -> sendSMS()); CompletableFuture<Void> pointsFuture = CompletableFuture.runAsync(() -> sendPoints()); // 等待两个任务执行结束 CompletableFuture<Void> allTasks = CompletableFuture.allOf(smsFuture, pointsFuture); // 在所有任务完成后执行回调 allTasks.thenRun(() -> { long endTime = System.currentTimeMillis(); long duration = endTime - startTime; System.out.println("All tasks are completed in " + duration + " milliseconds."); }); // 阻塞直到所有任务完成 allTasks.join(); } // 假设以下是发送短信和赠送积分的方法 public static void sendSMS() { // 发送短信的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Sending SMS..."); } public static void sendPoints() { // 赠送积分的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Sending Points..."); } }
4、@Async
package asynchronous.Eight; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @EnableAsync public class MainAnnotation4 implements AsyncConfigurer { public static void main(String[] args) { MainAnnotation4 main = new MainAnnotation4(); main.runAsyncTasks(); } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("MyAsyncThread-"); executor.initialize(); return executor; } public void runAsyncTasks() { long startTime = System.currentTimeMillis(); // 发送短信和赠送积分方法都会在新的线程中异步执行 sendSMS(); sendPoints(); long endTime = System.currentTimeMillis(); long duration = endTime - startTime; System.out.println("All tasks are submitted in " + duration + " milliseconds."); } @Async public void sendSMS() { // 发送短信的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Sending SMS..."); } @Async public void sendPoints() { // 赠送积分的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Sending Points..."); } }
使用@Async注解的异步执行可以使得发送短信和赠送积分的方法在不同的线程中并发执行,从而减少总时长。需要注意的是,通过异步执行,总时长可能会有所下降,但也受到系统资源和任务量的限制,不能保证总时长绝对减少。在实际使用中,需要根据具体情况进行线程池的配置和任务量的调整,以获得最佳的性能效果。
5、消息队列(下边代码均基于已经做好rabbitMQ配置)
import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableAsync @EnableRabbit @EnableScheduling public class Client { public static void main(String[] args) { SpringApplication.run(Client.class, args); } @Bean public Queue smsQueue() { return new Queue("sms"); } @Bean public Queue pointsQueue() { return new Queue("points"); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @Component class Scheduler { private final TaskSender taskSender; private final TaskReceiver taskReceiver; @Autowired public Scheduler(TaskSender taskSender, TaskReceiver taskReceiver) { this.taskSender = taskSender; this.taskReceiver = taskReceiver; } @Scheduled(fixedRate = 1000) public void scheduleTasks() { // 调用异步任务 CompletableFuture<Void> smsFuture = taskSender.sendSMS(); CompletableFuture<Void> pointsFuture = taskSender.sendPoints(); // 等待所有异步任务完成 CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(smsFuture, pointsFuture); try { // 等待所有异步任务完成,并打印总时长 allTasksFuture.get(); long duration = taskReceiver.getDuration(); System.out.println("Total duration: " + duration + " milliseconds"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } // @Scheduled(fixedDelay = 5000) // public void printTotalDuration() { // long duration = taskReceiver.getDuration(); // System.out.println("Total duration: " + duration + " milliseconds"); // }
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component class TaskReceiver { private long startTime; private long endTime; @RabbitListener(queues = "sms") public void receiveSMS(String message) { startTime = System.currentTimeMillis(); // 发送短信的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } endTime = System.currentTimeMillis(); System.out.println(message); } @RabbitListener(queues = "points") public void receivePoints(String message) { startTime = System.currentTimeMillis(); // 赠送积分的逻辑 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } endTime = System.currentTimeMillis(); System.out.println(message); } public long getDuration() { return endTime - startTime; } }
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; @Component class TaskSender { @Autowired private RabbitTemplate rabbitTemplate; public CompletableFuture<Void> sendSMS() { rabbitTemplate.convertAndSend("sms", "Sending SMS..."); return CompletableFuture.completedFuture(null); } public CompletableFuture<Void> sendPoints() { rabbitTemplate.convertAndSend("points", "Sending Points..."); return CompletableFuture.completedFuture(null); } }
总结
异步编程是一种强大的编程技术,它可以在处理并发任务时显著提升性能和响应性。通过将耗时的操作放在后台执行,异步编程使得主线程能够继续执行其他任务而不需要等待。
在现代应用程序中,异步编程已经变得愈发重要,特别是在涉及网络请求、数据库操作、IO操作或计算密集型任务时。通过使用异步编程,我们可以充分利用多核处理器和并发性,最大程度地提高程序的效率。
然而,异步编程也需要谨慎使用,特别是在处理复杂的并发逻辑时。在异步代码中,需要注意处理线程安全性和竞态条件,以避免潜在的并发问题。