蚂蚁:实现异步的8种方式,你知道几个? 上

简介: 蚂蚁:实现异步的8种方式,你知道几个?上


一、🌈前言

异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:「发送短信、邮件、异步更新等」 ,这些都是典型的可以通过异步实现的场景。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、异步的八种实现方式

  1. 线程Thread
  2. Future
  3. 异步框架CompletableFuture
  4. Spring注解@Async
  5. Spring ApplicationEvent事件
  6. 消息队列
  7. 第三方异步框架,比如Hutool的ThreadUtil
  8. Guava异步

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

三、什么是异步?

首先我们先看一个常见的用户下单的场景:

什么是异步

在同步操作中,我们执行到 「发送短信」 的时候,我们必须等待这个方法彻底执行完才能执行 「赠送积分」 这个操作,如果 「赠送积分」 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。

实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分发送短信这两个操作能够同时进行,比如:

异步

这就是所谓的异步,是不是非常简单,下面就说说异步的几种实现方式吧。

四、异步编程

4.1 线程异步

public class AsyncThread extends Thread {
    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }
    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.run();
    }
}

当然如果每次都创建一个Thread线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:

private ExecutorService executorService = Executors.newCachedThreadPool();
public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("执行业务逻辑...");
        }
    });
}

可以将业务逻辑封装到RunnableCallable中,交由线程池来执行。

4.2 Future异步

@Slf4j
public class FutureManager {
    public String execute() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });
        //这里需要返回值时会阻塞主线程
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }
    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

输出结果:

 --- task start --- 
 --- task finish ---
 Future get result: this is future execute final result!!!

4.2.1 Future的不足之处

Future的不足之处的包括以下几点:

1️⃣ 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。2️⃣ Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。3️⃣ Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。

4.3 CompletableFuture实现异步

public class CompletableFutureCompose {
    /**
     * thenAccept子任务和父任务公用同一个线程
     */
    @SneakyThrows
    public static void thenRunAsync() {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }
    public static void main(String[] args) {
        thenRunAsync();
    }
}

我们不需要显式使用ExecutorService,CompletableFuture 内部使用了ForkJoinPool来处理异步任务,如果在某些业务场景我们想自定义自己的异步线程池也是可以的。

4.4 Spring的@Async异步

4.4.1 自定义异步线程池

/**
 * 线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定义线程池
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

4.4.2 AsyncService

public interface AsyncService {
    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);
    MessageResult sendEmail(String email, String subject, String content);
}
@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {
    @Autowired
    private IMessageHandler mesageHandler;
    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {
            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);
        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {
            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);
        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}

在实际项目中, 使用@Async调用线程池,推荐等方式是是使用自定义线程池的模式,不推荐直接使用@Async直接实现异步。

相关文章
|
Java
http访问springboot接口出现401 、403、 Forbidden 错误解决方法
http访问springboot接口出现401 、403、 Forbidden 错误解决方法
3083 0
操作系统:死锁资源的计算
操作系统:死锁资源的计算
2323 0
|
9月前
|
人工智能
LangGraph:构建多代理动态工作流的开源框架,支持人工干预、循环、持久性等复杂工作流自动化
LangGraph 是一个基于图结构的开源框架,专为构建状态化、多代理系统设计,支持循环、持久性和人工干预,适用于复杂的工作流自动化。
1272 12
LangGraph:构建多代理动态工作流的开源框架,支持人工干预、循环、持久性等复杂工作流自动化
|
弹性计算 NoSQL 安全
如何在阿里云服务器上安装Redis数据库
如何在阿里云服务器上安装Redis数据库
10629 2
OWASP ESAPI 预防XSS跨站脚本攻击_xss攻击引入esapi(1)
OWASP ESAPI 预防XSS跨站脚本攻击_xss攻击引入esapi(1)
|
Java Nacos 开发工具
nacos服务端2.0.3 问题之启动报错如何解决
Nacos是一个开源的、易于部署的动态服务发现、配置管理和服务管理平台,旨在帮助微服务架构下的应用进行快速配置更新和服务治理;在实际运用中,用户可能会遇到各种报错,本合集将常见的Nacos报错问题进行归纳和解答,以便使用者能够快速定位和解决这些问题。
2110 112
|
存储 JSON 程序员
Python基础知识点总结
本文包括python基本知识:简单数据结构,数据结构类型(可变:列表,字典,集合,不可变:数值类型,字符串,元组),分支循环和控制流程,类和函数,文件处理和异常等等。
3466 2
Python基础知识点总结
|
Kubernetes Nacos 数据库
nacos常见问题之修改nacos密码后报错403如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
MIKE21 MIKE软件 教程 0.1 软件介绍与教学目录
MIKE 21 功能位于MIKE Zero集成平台下,是DHI公司的软件产品之一。
|
机器学习/深度学习 人工智能 算法
多智能体强化学习(二) MAPPO算法详解
多智能体强化学习(二) MAPPO算法详解
2906 0