实现异步编程的方式

简介: 实现异步编程的方式

背景

异步编程是一种解决并发问题的方式,它允许程序在执行某个任务时,不需要等待该任务完成,而是可以继续执行后续的任务。通过异步编程,我们可以将长时间运行的任务交给其他线程或进程来处理,从而提高程序的性能和响应性。

异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件、异步更新等,这些都是典型的可以通过异步实现的场景。

过程

异步实现的方式

1、线程Thread

2、Future

3、异步框架CompletableFuture

4、消息队列

什么是异步

在同步操作中,我们执行到 发送短信 的时候,我们必须等待这个方法彻底执行完才能执行 赠送积分 这个操作,如果 赠送积分 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。

实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分和发送短信这两个操作能够同时进行,比如:

这就是所谓的异步,是不是非常简单,下面就说说异步的几种实现方式吧。

详解异步的几种方式

1、线程Thread

package asynchronous.Eight;
public class AsyncThread1 {
    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        sendSMS();
        sendPoints();
        long endTime = System.currentTimeMillis();
        System.out.println("--costTime"+(endTime-startTime)+"ms");
    }
//    public static void main(String[] args) {
//
//        long startTime = System.currentTimeMillis();
//        // 创建并启动两个线程
//        Thread smsThread = new Thread(() -> {
//            try {
//                sendSMS();
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//        });
//        Thread pointsThread = new Thread(() -> {
//            try {
//                sendPoints();
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//        });
//
//        smsThread.start();
//        pointsThread.start();
//
//        try {
//            // 等待两个线程执行结束
//            smsThread.join();
//            pointsThread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//
//        long endTime = System.currentTimeMillis();
//        System.out.println("All tasks are completed.");
//        System.out.println("--costTime"+(endTime-startTime)+"ms");
//
//    }
    // 假设以下是发送短信和赠送积分的方法
    public static void sendSMS() throws InterruptedException {
        // 发送短信的逻辑
        Thread.sleep(300);
        System.out.println("Sending SMS...");
    }
    public static void sendPoints() throws InterruptedException {
        // 赠送积分的逻辑
        Thread.sleep(300);
        System.out.println("Sending Points...");
    }
}

2、Future

package asynchronous.Eight;
import java.util.concurrent.*;
public class FutureManager2 {
        public static void main(String[] args) {
            long startTime = System.currentTimeMillis();
            ExecutorService executor = Executors.newFixedThreadPool(2);
            // 提交两个任务并获取Future对象
            Future<Void> smsFuture = executor.submit(() -> {
                sendSMS();
                return null;
            });
            Future<Void> pointsFuture = executor.submit(() -> {
                sendPoints();
                return null;
            });
            try {
                // 等待两个任务执行结束
                smsFuture.get();
                pointsFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } finally {
                // 关闭线程池
                executor.shutdown();
            }
            System.out.println("All tasks are completed.");
            long endTime = System.currentTimeMillis();
        System.out.println("--costTime"+(endTime-startTime)+"ms");
        }
        // 假设以下是发送短信和赠送积分的方法
        public static void sendSMS() throws InterruptedException {
            // 发送短信的逻辑
            Thread.sleep(300);
            System.out.println("Sending SMS...");
        }
        public static void sendPoints() throws InterruptedException {
            Thread.sleep(300);
            // 赠送积分的逻辑
            System.out.println("Sending Points...");
        }
}

Future的不足之处的包括以下几点:

1️⃣ 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。

2️⃣ Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。

3️⃣ Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。

3、异步框架CompleteableFuture

package asynchronous.Eight;
import java.util.concurrent.CompletableFuture;
public class CompletableFutureCompose3 {
        public static void main(String[] args) {
            long startTime = System.currentTimeMillis();
            // 创建并执行两个CompletableFuture任务
            CompletableFuture<Void> smsFuture = CompletableFuture.runAsync(() -> sendSMS());
            CompletableFuture<Void> pointsFuture = CompletableFuture.runAsync(() -> sendPoints());
            // 等待两个任务执行结束
            CompletableFuture<Void> allTasks = CompletableFuture.allOf(smsFuture, pointsFuture);
            // 在所有任务完成后执行回调
            allTasks.thenRun(() -> {
                long endTime = System.currentTimeMillis();
                long duration = endTime - startTime;
                System.out.println("All tasks are completed in " + duration + " milliseconds.");
            });
            // 阻塞直到所有任务完成
            allTasks.join();
        }
        // 假设以下是发送短信和赠送积分的方法
        public static void sendSMS() {
            // 发送短信的逻辑
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Sending SMS...");
        }
        public static void sendPoints() {
            // 赠送积分的逻辑
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Sending Points...");
    }
}

4、@Async

package asynchronous.Eight;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@EnableAsync
public class MainAnnotation4 implements AsyncConfigurer {
    public static void main(String[] args) {
        MainAnnotation4 main = new MainAnnotation4();
        main.runAsyncTasks();
    }
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("MyAsyncThread-");
        executor.initialize();
        return executor;
    }
    public void runAsyncTasks() {
        long startTime = System.currentTimeMillis();
        // 发送短信和赠送积分方法都会在新的线程中异步执行
        sendSMS();
        sendPoints();
        long endTime = System.currentTimeMillis();
        long duration = endTime - startTime;
        System.out.println("All tasks are submitted in " + duration + " milliseconds.");
    }
    @Async
    public void sendSMS() {
        // 发送短信的逻辑
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Sending SMS...");
    }
    @Async
    public void sendPoints() {
        // 赠送积分的逻辑
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Sending Points...");
    }
}

使用@Async注解的异步执行可以使得发送短信和赠送积分的方法在不同的线程中并发执行,从而减少总时长。需要注意的是,通过异步执行,总时长可能会有所下降,但也受到系统资源和任务量的限制,不能保证总时长绝对减少。在实际使用中,需要根据具体情况进行线程池的配置和任务量的调整,以获得最佳的性能效果。

5、消息队列(下边代码均基于已经做好rabbitMQ配置)

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAsync
@EnableRabbit
@EnableScheduling
public class Client {
    public static void main(String[] args) {
        SpringApplication.run(Client.class, args);
    }
    @Bean
    public Queue smsQueue() {
        return new Queue("sms");
    }
    @Bean
    public Queue pointsQueue() {
        return new Queue("points");
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Component
class Scheduler {
    private final TaskSender taskSender;
    private final TaskReceiver taskReceiver;
    @Autowired
    public Scheduler(TaskSender taskSender, TaskReceiver taskReceiver) {
        this.taskSender = taskSender;
        this.taskReceiver = taskReceiver;
    }
    @Scheduled(fixedRate = 1000)
    public void scheduleTasks() {
        // 调用异步任务
        CompletableFuture<Void> smsFuture = taskSender.sendSMS();
        CompletableFuture<Void> pointsFuture = taskSender.sendPoints();
        // 等待所有异步任务完成
        CompletableFuture<Void> allTasksFuture = CompletableFuture.allOf(smsFuture, pointsFuture);
        try {
            // 等待所有异步任务完成,并打印总时长
            allTasksFuture.get();
            long duration = taskReceiver.getDuration();
            System.out.println("Total duration: " + duration + " milliseconds");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
//    @Scheduled(fixedDelay = 5000)
//    public void printTotalDuration() {
//        long duration = taskReceiver.getDuration();
//        System.out.println("Total duration: " + duration + " milliseconds");
//    }
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
class TaskReceiver {
    private long startTime;
    private long endTime;
    @RabbitListener(queues = "sms")
    public void receiveSMS(String message) {
        startTime = System.currentTimeMillis();
        // 发送短信的逻辑
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        endTime = System.currentTimeMillis();
        System.out.println(message);
    }
    @RabbitListener(queues = "points")
    public void receivePoints(String message) {
        startTime = System.currentTimeMillis();
        // 赠送积分的逻辑
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        endTime = System.currentTimeMillis();
        System.out.println(message);
    }
    public long getDuration() {
        return endTime - startTime;
    }
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
class TaskSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public CompletableFuture<Void> sendSMS() {
        rabbitTemplate.convertAndSend("sms", "Sending SMS...");
        return CompletableFuture.completedFuture(null);
    }
    public CompletableFuture<Void> sendPoints() {
        rabbitTemplate.convertAndSend("points", "Sending Points...");
        return CompletableFuture.completedFuture(null);
    }
}

总结

异步编程是一种强大的编程技术,它可以在处理并发任务时显著提升性能和响应性。通过将耗时的操作放在后台执行,异步编程使得主线程能够继续执行其他任务而不需要等待。

在现代应用程序中,异步编程已经变得愈发重要,特别是在涉及网络请求、数据库操作、IO操作或计算密集型任务时。通过使用异步编程,我们可以充分利用多核处理器和并发性,最大程度地提高程序的效率。

然而,异步编程也需要谨慎使用,特别是在处理复杂的并发逻辑时。在异步代码中,需要注意处理线程安全性和竞态条件,以避免潜在的并发问题。


相关文章
|
前端开发
promis:异步编程
promis:异步编程
49 0
|
2月前
|
JavaScript
异步编程
【10月更文挑战第26天】
31 2
|
1月前
|
Java API Spring
Java实现异步编程的几种方式
通过本文的介绍,我们了解了在Java中实现异步编程的几种常用方式。每种方法都有其优点和适用场景,具体选择哪种方式应根据实际需求和场景决定。如果任务较简单,可以使用 `Thread`或 `ExecutorService`;如果需要处理复杂的异步流程,可以考虑使用 `CompletableFuture`或Reactive编程框架。希望本文对您理解和实现Java异步编程有所帮助。
39 1
|
6月前
|
JSON 前端开发 JavaScript
在JavaScript中,异步编程是一种处理非阻塞操作(如网络请求、文件读写等)的重要技术
【6月更文挑战第12天】JavaScript中的异步编程通过Promise和async/await处理非阻塞操作。Promise管理异步操作的三种状态,防止回调地狱,支持链式调用和并行处理。async/await是ES8引入的语法糖,使异步代码更像同步代码,提高可读性。两者结合使用能更高效地处理复杂异步场景。
41 3
|
2月前
|
消息中间件 前端开发 JavaScript
探索JavaScript中的事件循环机制:异步编程的核心
【10月更文挑战第12天】探索JavaScript中的事件循环机制:异步编程的核心
41 1
|
6月前
|
安全 API 调度
异步编程中常见的问题和处理方式
【6月更文挑战第23天】在python中`asyncio` 提供PriorityQueue和LifoQueue,用于不同检索策略。异步编程需注意任务调度、错误处理和资源管理,以提高响应性和避免阻塞。
161 7
异步编程中常见的问题和处理方式
|
程序员 调度 C#
协程是什么?为何说协程具有同步的编程方式又具有异步的性能?
协程是什么?为何说协程具有同步的编程方式又具有异步的性能?
311 0
|
C#
C#异步编程
C#异步编程
187 0
|
C#
c#异步编程
c#异步编程原理,await asnyc的使用方法。异步编程是指在程序执行过程中,不需要等待某个操作完成,就可以继续执行后续的代码。
303 0
|
机器学习/深度学习 Java 编译器
2.2异步编程
.net core异步编程