Java中的异步编程方案总结

简介: Java中的异步编程是一种能够提高程序性能和响应速度的技术。它通过将耗时的操作放在单独的线程中,让主线程继续执行其他任务,从而实现并发处理和异步执行。在Java中,异步编程常用的方式有多线程、Future和CompletableFuture等。在实际应用中,异步编程可以优化网络请求、数据库操作等IO密集型任务的性能,提高程序的响应速度和吞吐量。虽然异步编程可以带来许多好处,但同时也涉及到一些问题,比如线程安全、回调地狱等。因此,在使用异步编程时需要注意合理地设计和管理线程,确保程序的正确性和可维护性。

一、概述

Java中的异步编程是一种能够提高程序性能和响应速度的技术。它通过将耗时的操作放在单独的线程中,让主线程继续执行其他任务,从而实现并发处理和异步执行。在Java中,异步编程常用的方式有多线程、Future和CompletableFuture等。在实际应用中,异步编程可以优化网络请求、数据库操作等IO密集型任务的性能,提高程序的响应速度和吞吐量。虽然异步编程可以带来许多好处,但同时也涉及到一些问题,比如线程安全、回调地狱等。因此,在使用异步编程时需要注意合理地设计和管理线程,确保程序的正确性和可维护性。

本文主要总结几种常见的异步编程方案,包括:(1)基于Thread的多线程实现;(2)实现CompletableFuture接口;(3)通过@Async注解实现;(4)基于ApplicationEvent事件订阅;(5)基于消息中间件;

32D60601-7D6B-4187-A49E-D7F11B1F2456

二、异步场景说明

异步编程是让程序并发运行的一种手段。它允许多个事件同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行。

他的核心思路是:采用多线程优化性能,将串行操作变成并行操作。异步模式设计的程序可以显著减少线程等待,从而在高吞吐量场景中,极大提升系统的整体性能,显著降低时延。

三、JDK原生方案

1.基于Thread的多线程

直接继承 Thread类 是创建异步线程最简单的方式。

首先,创建 Thread 子类,普通类或匿名内部类方式;然后创建子类实例;最后通过 start()方法启动线程。

public class ThreadTest implements Runnable{
   
   
    public static void main(String[] args) {
   
   
        Thread thread = new Thread(new ThreadTest());
        thread.start();
    }

    @Override
    public void run() {
   
   
        System.out.println(Thread.currentThread().getName());
    }
}

由于需要频繁的创建和销毁线程,这样的操作会对系统资源造成浪费,所以引入了线程池来管理多线性。以下代码的static ExecutorService executorService = Executors.newFixedThreadPool(3);就是创建线程池的操作。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
   
   
    static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
   
   

        for (int i = 0; i < 6; i++) {
   
   
            executorService.submit(() -> {
   
   
                System.out.println(Thread.currentThread().getName());
                try {
   
   
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
            });
        }
    }
}

上述方式虽然达到了多线程并行处理,但有些业务不仅仅要执行过程,还要获取执行后的结果。Java 从 1.5 版本开始,提供了 Callable 和 Future,可以在任务执行完毕之后得到任务执行结果。 当然也提供了其他功能,如:取消任务、查询任务是否完成等。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest {
   
   
    static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
   
   
        for (int i = 0; i < 6; i++) {
   
   
            Future future = executorService.submit(
                    new Callable() {
   
   
                        @Override
                        public String call() throws Exception {
   
   
                            System.out.println(Thread.currentThread().getName());
                            Thread.sleep((long) (1000 + Math.random() * 1000));
                            return Thread.currentThread().getName();
                        }
                    }
            );
            System.out.println("threadName = " + future);
        }
    }
}

2.实现CompletableFuture接口

通过Future来获取异步线程的消息有个弊端是他是同步阻塞的,是需要主线程通过future.get()方法来查询起结果的,为了解决这个问题,CompletableFuture可以通过回调的方式来处理计算结果,实现了异步非阻塞,性能更优。CompletableFuture可以实现的功能包括:(1)任务执行成功后,会回调某个执行成功的方法;(2)任务执行失败后,会回调某个执行失败的方法;

import java.util.concurrent.CompletableFuture;

/**
 * The type Completable futurn compose test.
 *
 * @author yangnk
 */
public class CompletableFuturnComposeTest {
   
   

    /**
     * Completable futurn compose.
     */
    public static void completableFuturnCompose() {
   
   
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
   
   
            try {
   
   
                Thread.sleep(3000);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return "hello";
        });
        completableFuture.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
   
   
            try {
   
   
                Thread.sleep(1000);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return "world";
        });

        completableFuture1.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture completableFuture2 = completableFuture1.thenApplyAsync(t ->{
   
   
            return "after " + t;
        });
        completableFuture2.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture.allOf(completableFuture, completableFuture1).join();
        System.out.println("1." + System.currentTimeMillis());
    }

    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
   
   
        completableFuturnCompose();
        System.out.println("2." + System.currentTimeMillis());

    }
}

四、集成Spring的方案

1.基于@Async注解

SpringBoot框架还提供了一种基于注解的方案,该方案以方法体为界,使方法体内部的代码逻辑全部按异步方式执行。为了启用异步注解,首先需要使用@EnableAsync。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class SpringDemo1Application {
   
   

    public static void main(String[] args) {
   
   
        SpringApplication application = new SpringApplication(SpringDemo1Application.class);
        ConfigurableApplicationContext context =  application.run(args);
    }
}

然后再自定义线程池:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
public class TaskPoolConfig {
   
   

    @Bean("taskExecutor")
    public Executor taskExecutor() {
   
   
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

在异步处理的方法上,最后添加了注解 @Async。当调用 execute 方法时,通过自定义的线程池 taskExecutor,实现了对 execute 方法的异步化执行。这样可以提高程序的效率和性能。

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

interface AsyncService {
   
   

    String sendSms(String callPrefix, String mobile, String actionType, String content);
    String sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncTest implements AsyncService {
   
   

    @Override
    @Async("taskExecutor")
    public String sendSms(String callPrefix, String mobile, String actionType, String content) {
   
   
        try {
   
   

            Thread.sleep(1000);
            System.out.println( "发送sms成功");

        } catch (Exception e) {
   
   
            log.error("发送短信异常 -> ", e);
        }
        return "发送sms成功";
    }

    @Override
    @Async("taskExecutor")
    public String sendEmail(String email, String subject, String content) {
   
   
        try {
   
   

            Thread.sleep(1000);
            System.out.println( "发送email成功");

        } catch (Exception e) {
   
   
            log.error("发送email异常 -> ", e);
        }
        return "发送成功";
    }
}

注意事项:由于通过 @Async注解来实现异步调用本质上是通过AOP代理实现的,代理类调用自身的方法会失效,所以加了 @Async注解的方法无法直接调用自己类中的方法,这样操作会调用失败。

总结一下在Springboot中基于@Async注解来实现异步调用:

  1. 调用异步方法类上或者启动类加上注解 @EnableAsync
  2. 在需要被异步调用的方法外加上 @Async
  3. 所使用的 @Async 注解方法的类对象应该是 Spring 容器管理的 bean 对象;

2.基于ApplicationEvent事件订阅

事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。本质上,这是标准的观察者设计模式。 ApplicationEvent 是由 Spring 提供的所有 Event 类的基类。

首先,需要自定义业务事件子类,继承自 ApplicationEvent,通过泛型注入业务模型参数类。相当于 MQ 的消息体。

import org.springframework.context.ApplicationEvent;

//自定义事件
public class ApplicationEventTest extends ApplicationEvent {
   
   

    public ApplicationEventTest(Object source) {
   
   
        super(source);
    }

    /**
     * 事件处理事项
     * @param msg
     */
    public void printMsg(String msg)
    {
   
   
        System.out.println("监听到事件:" + ApplicationEventTest.class + ", 时间为:" + msg);
    }
}

然后编写事件监听器。ApplicationListener接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener。相当于 MQ 的消费端。

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Date;

//自定义事件监听器
@Component
public class ApplicationListenerTest implements ApplicationListener {
   
   

    @Override
    public void onApplicationEvent(ApplicationEventTest event) {
   
   
        event.printMsg(String.valueOf(new Date()));
    }
}

最后发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class ApplicationEventPubTest implements ApplicationContextAware {
   
   
    ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
   
   
        this.applicationContext = applicationContext;
    }

    @RequestMapping("/hello1")
    public String index() {
   
   
        applicationContext.publishEvent(new ApplicationEventTest(new Object()));
        return "Hello World.";
    }
}

五、消息中间件实现

异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。

消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式和发布订阅模式两种。消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。 当然市面上消息队列框架非常多,常见的有 RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等。借助消息队列这个中间件可以高效的实现异步编程。

六、代码实现(未更新到博客)

GitHub:https://github.com/yangnk/SpringBoot_Learning/tree/master/SpringBootExample/src/main/java/com/yangnk/async


参考资料

  1. 为什么都不建议直接使用 @Async 注解实现异步?:https://juejin.cn/post/7099328896142671903#heading-6
  2. Java实现异步编程的8种方式:https://juejin.cn/post/7165147306688249870 (主要参考)
  3. 一文带你彻底了解 Java 异步编程:https://xie.infoq.cn/article/37052813d58d4a3a41714c1d0
  4. 7 种 Java 异步编程实现方式!:https://zhuanlan.zhihu.com/p/572874448
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
小程序 数据可视化 Java
Java+后端Spring boot 开发的全套UWB定位方案,0.1米高精度定位系统源码
UWB定位系统由硬件定位设备、定位引擎和应用软件组成。该定位系统应用软件支持PC端和移动端访问,并提供位置实时显示、历史轨迹回放、人员考勤、电子围栏、行为分析、智能巡检等功能。定位精度高达10cm,同时具备高动态、高容量、低功耗的优点。应用场景包括:隧道、化工、工厂、煤矿、工地、电厂、养老、展馆、整车、机房、机场等。
124 8
|
8月前
|
Java
1276. 不浪费原料的汉堡制作方案 --力扣 --JAVA
圣诞活动预热开始啦,汉堡店推出了全新的汉堡套餐。为了避免浪费原料,请你帮他们制定合适的制作计划。 给你两个整数 tomatoSlices 和 cheeseSlices,分别表示番茄片和奶酪片的数目。不同汉堡的原料搭配如下: 巨无霸汉堡:4 片番茄和 1 片奶酪 小皇堡:2 片番茄和 1 片奶酪 请你以 [total_jumbo, total_small]([巨无霸汉堡总数,小皇堡总数])的格式返回恰当的制作方案,使得剩下的番茄片 tomatoSlices 和奶酪片 cheeseSlices 的数量都是 0。 如果无法使剩下的番茄片 tomatoSlices 和奶酪片 cheeseSlic
64 0
|
8月前
|
SQL Java 应用服务中间件
Java项目防止SQL注入的四种方案
Java项目防止SQL注入的四种方案
150 0
|
设计模式 Java
Java克隆方式避免频繁创建对象优化方案
Java克隆方式避免频繁创建对象优化方案
124 0
|
存储 缓存 Java
Java 序列化方案
在Java中,对象序列化是将一个对象的状态信息转换为字节流的过程,以便将其存储到文件或传输到另一个计算机。反序列化是将这个字节流转换回对象的过程。对象序列化和反序列化在Java编程中具有广泛的应用,例如远程方法调用、数据持久化和缓存等等。
80 0
|
2天前
|
SQL Java 数据库连接
【潜意识Java】Java中JDBC过时方法的替代方案以及JDBC为什么过时详细分析
本文介绍了JDBC中一些常见过时方法及其替代方案。
21 5
|
29天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
160 2
|
6月前
|
安全 Java 数据安全/隐私保护
在Java项目中集成单点登录(SSO)方案
在Java项目中集成单点登录(SSO)方案
|
7月前
|
安全 Java UED
深度解析Java中方法内的异步调用实践与应对方案
深度解析Java中方法内的异步调用实践与应对方案
172 1
|
6月前
|
Java 关系型数据库 MySQL
GraalVM 静态编译下 OTel Java Agent 的自动增强方案与实现
在 2024 OpenTelemetry Community Day 会议中,阿里云可观测工程师张乎兴(望陶)和饶子昊(铖朴)为大家带来了《GraalVM 静态编译下 OTel Java Agent 的自动增强方案与实现》的演讲分享,介绍阿里云在相关领域的探索方案,本文是相关分享对应的中文整理。
289 20