Java中的异步编程方案总结

简介: Java中的异步编程是一种能够提高程序性能和响应速度的技术。它通过将耗时的操作放在单独的线程中,让主线程继续执行其他任务,从而实现并发处理和异步执行。在Java中,异步编程常用的方式有多线程、Future和CompletableFuture等。在实际应用中,异步编程可以优化网络请求、数据库操作等IO密集型任务的性能,提高程序的响应速度和吞吐量。虽然异步编程可以带来许多好处,但同时也涉及到一些问题,比如线程安全、回调地狱等。因此,在使用异步编程时需要注意合理地设计和管理线程,确保程序的正确性和可维护性。

一、概述

Java中的异步编程是一种能够提高程序性能和响应速度的技术。它通过将耗时的操作放在单独的线程中,让主线程继续执行其他任务,从而实现并发处理和异步执行。在Java中,异步编程常用的方式有多线程、Future和CompletableFuture等。在实际应用中,异步编程可以优化网络请求、数据库操作等IO密集型任务的性能,提高程序的响应速度和吞吐量。虽然异步编程可以带来许多好处,但同时也涉及到一些问题,比如线程安全、回调地狱等。因此,在使用异步编程时需要注意合理地设计和管理线程,确保程序的正确性和可维护性。

本文主要总结几种常见的异步编程方案,包括:(1)基于Thread的多线程实现;(2)实现CompletableFuture接口;(3)通过@Async注解实现;(4)基于ApplicationEvent事件订阅;(5)基于消息中间件;

32D60601-7D6B-4187-A49E-D7F11B1F2456

二、异步场景说明

异步编程是让程序并发运行的一种手段。它允许多个事件同时发生,当程序调用需要长时间运行的方法时,它不会阻塞当前的执行流程,程序可以继续运行。

他的核心思路是:采用多线程优化性能,将串行操作变成并行操作。异步模式设计的程序可以显著减少线程等待,从而在高吞吐量场景中,极大提升系统的整体性能,显著降低时延。

三、JDK原生方案

1.基于Thread的多线程

直接继承 Thread类 是创建异步线程最简单的方式。

首先,创建 Thread 子类,普通类或匿名内部类方式;然后创建子类实例;最后通过 start()方法启动线程。

public class ThreadTest implements Runnable{
   
   
    public static void main(String[] args) {
   
   
        Thread thread = new Thread(new ThreadTest());
        thread.start();
    }

    @Override
    public void run() {
   
   
        System.out.println(Thread.currentThread().getName());
    }
}

由于需要频繁的创建和销毁线程,这样的操作会对系统资源造成浪费,所以引入了线程池来管理多线性。以下代码的static ExecutorService executorService = Executors.newFixedThreadPool(3);就是创建线程池的操作。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
   
   
    static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
   
   

        for (int i = 0; i < 6; i++) {
   
   
            executorService.submit(() -> {
   
   
                System.out.println(Thread.currentThread().getName());
                try {
   
   
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
            });
        }
    }
}

上述方式虽然达到了多线程并行处理,但有些业务不仅仅要执行过程,还要获取执行后的结果。Java 从 1.5 版本开始,提供了 Callable 和 Future,可以在任务执行完毕之后得到任务执行结果。 当然也提供了其他功能,如:取消任务、查询任务是否完成等。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest {
   
   
    static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
   
   
        for (int i = 0; i < 6; i++) {
   
   
            Future future = executorService.submit(
                    new Callable() {
   
   
                        @Override
                        public String call() throws Exception {
   
   
                            System.out.println(Thread.currentThread().getName());
                            Thread.sleep((long) (1000 + Math.random() * 1000));
                            return Thread.currentThread().getName();
                        }
                    }
            );
            System.out.println("threadName = " + future);
        }
    }
}

2.实现CompletableFuture接口

通过Future来获取异步线程的消息有个弊端是他是同步阻塞的,是需要主线程通过future.get()方法来查询起结果的,为了解决这个问题,CompletableFuture可以通过回调的方式来处理计算结果,实现了异步非阻塞,性能更优。CompletableFuture可以实现的功能包括:(1)任务执行成功后,会回调某个执行成功的方法;(2)任务执行失败后,会回调某个执行失败的方法;

import java.util.concurrent.CompletableFuture;

/**
 * The type Completable futurn compose test.
 *
 * @author yangnk
 */
public class CompletableFuturnComposeTest {
   
   

    /**
     * Completable futurn compose.
     */
    public static void completableFuturnCompose() {
   
   
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
   
   
            try {
   
   
                Thread.sleep(3000);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return "hello";
        });
        completableFuture.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
   
   
            try {
   
   
                Thread.sleep(1000);
            } catch (InterruptedException e) {
   
   
                e.printStackTrace();
            }
            return "world";
        });

        completableFuture1.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture completableFuture2 = completableFuture1.thenApplyAsync(t ->{
   
   
            return "after " + t;
        });
        completableFuture2.whenComplete((s, throwable) -> {
   
   
            System.out.println(s);
        });

        CompletableFuture.allOf(completableFuture, completableFuture1).join();
        System.out.println("1." + System.currentTimeMillis());
    }

    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
   
   
        completableFuturnCompose();
        System.out.println("2." + System.currentTimeMillis());

    }
}

四、集成Spring的方案

1.基于@Async注解

SpringBoot框架还提供了一种基于注解的方案,该方案以方法体为界,使方法体内部的代码逻辑全部按异步方式执行。为了启用异步注解,首先需要使用@EnableAsync。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class SpringDemo1Application {
   
   

    public static void main(String[] args) {
   
   
        SpringApplication application = new SpringApplication(SpringDemo1Application.class);
        ConfigurableApplicationContext context =  application.run(args);
    }
}

然后再自定义线程池:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
public class TaskPoolConfig {
   
   

    @Bean("taskExecutor")
    public Executor taskExecutor() {
   
   
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量,默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

在异步处理的方法上,最后添加了注解 @Async。当调用 execute 方法时,通过自定义的线程池 taskExecutor,实现了对 execute 方法的异步化执行。这样可以提高程序的效率和性能。

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

interface AsyncService {
   
   

    String sendSms(String callPrefix, String mobile, String actionType, String content);
    String sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncTest implements AsyncService {
   
   

    @Override
    @Async("taskExecutor")
    public String sendSms(String callPrefix, String mobile, String actionType, String content) {
   
   
        try {
   
   

            Thread.sleep(1000);
            System.out.println( "发送sms成功");

        } catch (Exception e) {
   
   
            log.error("发送短信异常 -> ", e);
        }
        return "发送sms成功";
    }

    @Override
    @Async("taskExecutor")
    public String sendEmail(String email, String subject, String content) {
   
   
        try {
   
   

            Thread.sleep(1000);
            System.out.println( "发送email成功");

        } catch (Exception e) {
   
   
            log.error("发送email异常 -> ", e);
        }
        return "发送成功";
    }
}

注意事项:由于通过 @Async注解来实现异步调用本质上是通过AOP代理实现的,代理类调用自身的方法会失效,所以加了 @Async注解的方法无法直接调用自己类中的方法,这样操作会调用失败。

总结一下在Springboot中基于@Async注解来实现异步调用:

  1. 调用异步方法类上或者启动类加上注解 @EnableAsync
  2. 在需要被异步调用的方法外加上 @Async
  3. 所使用的 @Async 注解方法的类对象应该是 Spring 容器管理的 bean 对象;

2.基于ApplicationEvent事件订阅

事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。本质上,这是标准的观察者设计模式。 ApplicationEvent 是由 Spring 提供的所有 Event 类的基类。

首先,需要自定义业务事件子类,继承自 ApplicationEvent,通过泛型注入业务模型参数类。相当于 MQ 的消息体。

import org.springframework.context.ApplicationEvent;

//自定义事件
public class ApplicationEventTest extends ApplicationEvent {
   
   

    public ApplicationEventTest(Object source) {
   
   
        super(source);
    }

    /**
     * 事件处理事项
     * @param msg
     */
    public void printMsg(String msg)
    {
   
   
        System.out.println("监听到事件:" + ApplicationEventTest.class + ", 时间为:" + msg);
    }
}

然后编写事件监听器。ApplicationListener接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener。相当于 MQ 的消费端。

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Date;

//自定义事件监听器
@Component
public class ApplicationListenerTest implements ApplicationListener {
   
   

    @Override
    public void onApplicationEvent(ApplicationEventTest event) {
   
   
        event.printMsg(String.valueOf(new Date()));
    }
}

最后发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class ApplicationEventPubTest implements ApplicationContextAware {
   
   
    ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
   
   
        this.applicationContext = applicationContext;
    }

    @RequestMapping("/hello1")
    public String index() {
   
   
        applicationContext.publishEvent(new ApplicationEventTest(new Object()));
        return "Hello World.";
    }
}

五、消息中间件实现

异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。

消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式和发布订阅模式两种。消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。 当然市面上消息队列框架非常多,常见的有 RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等。借助消息队列这个中间件可以高效的实现异步编程。

六、代码实现(未更新到博客)

GitHub:https://github.com/yangnk/SpringBoot_Learning/tree/master/SpringBootExample/src/main/java/com/yangnk/async


参考资料

  1. 为什么都不建议直接使用 @Async 注解实现异步?:https://juejin.cn/post/7099328896142671903#heading-6
  2. Java实现异步编程的8种方式:https://juejin.cn/post/7165147306688249870 (主要参考)
  3. 一文带你彻底了解 Java 异步编程:https://xie.infoq.cn/article/37052813d58d4a3a41714c1d0
  4. 7 种 Java 异步编程实现方式!:https://zhuanlan.zhihu.com/p/572874448
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
小程序 数据可视化 Java
Java+后端Spring boot 开发的全套UWB定位方案,0.1米高精度定位系统源码
UWB定位系统由硬件定位设备、定位引擎和应用软件组成。该定位系统应用软件支持PC端和移动端访问,并提供位置实时显示、历史轨迹回放、人员考勤、电子围栏、行为分析、智能巡检等功能。定位精度高达10cm,同时具备高动态、高容量、低功耗的优点。应用场景包括:隧道、化工、工厂、煤矿、工地、电厂、养老、展馆、整车、机房、机场等。
336 8
|
5月前
|
机器学习/深度学习 JSON Java
Java调用Python的5种实用方案:从简单到进阶的全场景解析
在机器学习与大数据融合背景下,Java与Python协同开发成为企业常见需求。本文通过真实案例解析5种主流调用方案,涵盖脚本调用到微服务架构,助力开发者根据业务场景选择最优方案,提升开发效率与系统性能。
1362 0
|
6月前
|
Cloud Native 前端开发 Java
WebAssembly 与 Java 结合的跨语言协作方案及性能提升策略研究
本文深入探讨了WebAssembly与Java的结合方式,介绍了编译Java为Wasm模块、在Java中运行Wasm、云原生集成等技术方案,并通过金融分析系统的应用实例展示了其高性能、低延迟、跨平台等优势。结合TeaVM、JWebAssembly、GraalVM、Wasmer Java等工具,帮助开发者提升应用性能与开发效率,适用于Web前端、服务器端及边缘计算等场景。
255 0
|
5月前
|
缓存 监控 Kubernetes
Java虚拟机内存溢出(Java Heap Space)问题处理方案
综上所述, 解决Java Heap Space溢出需从多角度综合施策; 包括但不限于配置调整、代码审查与优化以及系统设计层面改进; 同样也不能忽视运行期监控与预警设置之重要性; 及早发现潜在风险点并采取相应补救手段至关重要.
795 17
|
5月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
7月前
|
缓存 Java 数据库
Java 项目分层架构实操指南及长尾关键词优化方案
本指南详解基于Spring Boot与Spring Cloud的Java微服务分层架构,以用户管理系统为例,涵盖技术选型、核心代码实现、服务治理及部署实践,助力掌握现代化Java企业级开发方案。
358 2
|
8月前
|
安全 Java Docker
Docker 部署 Java 应用实战指南与长尾优化方案
本文详细介绍了Docker容器化部署Java应用的最佳实践。首先阐述了采用多阶段构建和精简JRE的镜像优化技术,可将镜像体积减少60%。其次讲解了资源配置、健康检查、启动优化等容器化关键配置,并演示了Spring Boot微服务的多模块构建与Docker Compose编排方案。最后深入探讨了Kubernetes生产部署、监控日志集成、灰度发布策略以及性能调优和安全加固措施,为Java应用的容器化部署提供了完整的解决方案指南。文章还包含大量可落地的代码示例,涵盖从基础到高级的生产环境实践。
483 3
|
7月前
|
JavaScript Java Go
Go、Node.js、Python、PHP、Java五种语言的直播推流RTMP协议技术实施方案和思路-优雅草卓伊凡
Go、Node.js、Python、PHP、Java五种语言的直播推流RTMP协议技术实施方案和思路-优雅草卓伊凡
548 0
|
8月前
|
前端开发 数据可视化 Java
开发 JavaFX 与 Java Swing 桌面应用的实用技巧与实践方案
本文介绍了Java桌面应用开发的技术选型与JavaFX实战方案。首先对比了JavaFX和Swing的特点,推荐JavaFX更适合现代UI需求。重点讲解了JavaFX 19+的技术升级,包括模块化开发(module-info.java配置)和响应式UI设计(CSS样式管理)。在数据访问层展示了JDBC 4.3的集成和异步加载实现。高级UI组件部分演示了自定义表格和图表可视化的开发方法。最后介绍了MVVM架构的实现,包括视图模型的数据绑定和FXML控制器的集成,为开发者提供了完整的JavaFX桌面应用开发解决方案。
708 0
|
10月前
|
数据采集 自然语言处理 JavaScript
Playwright多语言生态:跨Python/Java/.NET的统一采集方案
随着数据采集需求的增加,传统爬虫工具如Selenium、Jsoup等因语言割裂、JS渲染困难及代理兼容性差等问题,难以满足现代网站抓取需求。微软推出的Playwright框架,凭借多语言支持(Python/Java/.NET/Node.js)、统一API接口和优异的JS兼容性,解决了跨语言协作、动态页面解析和身份伪装等痛点。其性能优于Selenium与Puppeteer,在学术数据库(如Scopus)抓取中表现出色。行业应用广泛,涵盖高校科研、大型数据公司及AI初创团队,助力构建高效稳定的爬虫系统。
614 2
Playwright多语言生态:跨Python/Java/.NET的统一采集方案