Java多线程实战-CompletableFuture异步编程优化查询接口响应速度

简介: Java多线程实战-CompletableFuture异步编程优化查询接口响应速度

前言

在Web应用开发中,一个界面可能需要同时请求多个接口来获取不同信息。传统的做法是编写一个聚合接口同步获取这些数据,第二种方法是分多次请求来获取数据。这两种方式虽然简单直观,但效率比较低下,随着应用复杂度的增加,这种低效的做法将会带来严重的性能问题。


异步编程模型可以很好地解决这个问题。多个任务可以同时执行,互不影响,从而大幅提高应用的响应速度和吞吐量。Java 8 中引入的CompletableFuture为异步编程提供了强有力的支持,使得编写异步代码变得更加简单。本文将重点介绍如何利用CompletableFuture优化并发查询接口的响应速度。

实现思路


要优化并发查询接口的响应速度,传统的优化方式是通过多线程来并行执行多个查询任务。但这种做法存在一些缺陷:


创建和管理线程的开销较大,如果线程数量过多,会给系统带来很大的压力。

如果查询任务的执行时间不均匀,会导致部分线程需要长时间等待,资源利用率低下。

而CompletableFuture提供了一种更优雅、更高效的解决方案。其核心思路是:


每个查询任务都封装为一个CompletableFuture异步任务,由线程池并行执行。

通过CompletableFuture.allOf()方法等待所有异步任务完成。

最后从每个任务的结果中组装出最终需要的数据对象。

CompletableFuture快速入门

在JDK8以后,CompletableFuture提供了丰富的API用于异步编程,下面列举了一些最常见的用法:

1.创建CompletableFuture

有多种方式可以创建CompletableFuture:

// 从一个供给函数创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
 
// 从一个运行函数创建 
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("Hello"));
 
// 从一个已有的结果创建
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");


2.链式调用

CompletableFuture支持链式调用,可以方便地对异步结果进行转换和组合:

CompletableFuture<String> resultFuture = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World") // 对结果进行转换
    .thenCompose(s -> getResult(s)); // 组合另一个异步操作


3.异常处理

通过exceptionally()方法可以对异常情况进行处理:

String result = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("error"); 
}).exceptionally(ex -> {
    // 处理异常
    return "Default Value";
}).get();


4.组合多个CompletableFuture

通过allOf,anyOf这两种方式我们可以让任务之间协同工作,join()和get()方法都是阻塞调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。


get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。


而 join() 会抛出未经检查的异常。

// 等待所有任务完成
CompletableFuture.allOf(future1, future2, future3).get();
CompletableFuture.allOf(future1, future2, future3).join();
 
// 只要任意一个任务完成即可  
CompletableFuture.anyOf(future1, future2, future3).get();
CompletableFuture.anyOf(future1, future2, future3).join();
 
// 规定超时时间,防止一直堵塞
CompletableFuture.allOf(future1, future2, future3).get(6, TimeUnit.SECONDS);

5.设置超时时间

我们可以通过下面的方式可以设置某个CompletableFuture的超时时间:

String result = CompletableFuture.supplyAsync(() -> "Hello")
                 .completeOnTimeout("Timeout!", 1, TimeUnit.SECONDS)
                 .get();

代码实现

1.初始化线程池

application.yaml配置文件

# 线程池配置
thread:
  pool:
    corePoolSize: 10
    maxPoolSize: 20
    queueCapacity: 100
    keepAliveSeconds: 60


线程池配置类ThreadPoolConfig

/**
 * @author Luckysj @刘仕杰
 * @description 线程池配置
 * @create 2024/03/19 21:43:57
 */
@Configuration
public class ThreadPoolConfig {
 
    @Value("${thread.pool.corePoolSize}")
    private int corePoolSize;
 
    @Value("${thread.pool.maxPoolSize}")
    private int maxPoolSize;
 
    @Value("${thread.pool.queueCapacity}")
    private int queueCapacity;
 
    @Value("${thread.pool.keepAliveSeconds}")
    private int keepAliveSeconds;
 
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveSeconds,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(queueCapacity),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }
}


2.封装响应信息聚合对象

我们这里模拟用户相关的界面,这里需要点赞数,粉丝数,文章数等信息

/**
 * @author Luckysj @刘仕杰
 * @description 信息聚合对象
 * @create 2024/03/19 21:48:13
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserBehaviorDataDTO {
 
    //用户ID
    private Long userId ;
 
    //发布文章数
    private Long articleCount ;
 
    //点赞数
    private Long likeCount ;
 
    //粉丝数
    private Long fansCount ;
 
    //消息数
    private Long msgCount ;
 
    //收藏数
    private Long collectCount ;
 
    //关注数
    private Long followCount ;
 
    //红包数
    private Long redBagCount ;
 
    // 卡券数
    private Long couponCount ;
 
}


3.通过CompletableFuture异步执行每一个查询操作

如下,我们定义了一个异步任务类,创建每一个查询操作的CompletableFuture异步任务放入线程中执行,并利用allOf等待全部任务执行完成,执行完成后组装查询信息到聚合对象中返回

/**
 * @author Luckysj @刘仕杰
 * @description 一个页面可能有多达10个左右的一个用户行为数据,我们可以通过多线程来提高查询速率
 * @create 2024/03/19 21:45:04
 */
@Slf4j
@Component
public class MyFutureTask {
    @Resource
    UserService userService;
 
    // 线程池
    @Resource
    private ExecutorService executor;
    public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {
        System.out.println("MyFutureTask的线程:" + Thread.currentThread());
        try {
            // 1.发布文章数
            CompletableFuture<Long> articleCountFT = CompletableFuture.supplyAsync(() -> userService.countArticleCountByUserId(userId), executor);
            // 2.点赞数
            CompletableFuture<Long> LikeCountFT = CompletableFuture.supplyAsync(() -> userService.countLikeCountByUserId(userId), executor);
            // 3.粉丝数
            CompletableFuture<Long> fansCountFT = CompletableFuture.supplyAsync(() -> userService.countFansCountByUserId(userId), executor);
            // 4.消息数
            CompletableFuture<Long> msgCountFT = CompletableFuture.supplyAsync(() -> userService.countMsgCountByUserId(userId), executor);
            // 5.收藏数
            CompletableFuture<Long> collectCountFT = CompletableFuture.supplyAsync(() -> userService.countCollectCountByUserId(userId), executor);
            // 6.关注数
            CompletableFuture<Long> followCountFT = CompletableFuture.supplyAsync(() -> userService.countFollowCountByUserId(userId), executor);
            // 7.红包数
            CompletableFuture<Long> redBagCountFT = CompletableFuture.supplyAsync(() -> userService.countRedBagCountByUserId(userId), executor);
            // 8.卡券数
            CompletableFuture<Long> couponCountFT = CompletableFuture.supplyAsync(() -> userService.countCouponCountByUserId(userId), executor);
 
            // 等待全部线程执行完毕 这里一定要设超时时间,不然会一直等待
            CompletableFuture.allOf(articleCountFT, LikeCountFT, fansCountFT, msgCountFT, collectCountFT, followCountFT, redBagCountFT, couponCountFT).get(6, TimeUnit.SECONDS);
 
            // 必须设置合理的超时时间
            UserBehaviorDataDTO userBehaviorData = UserBehaviorDataDTO.builder().articleCount(articleCountFT.get()).likeCount(LikeCountFT.get()).fansCount(fansCountFT.get()).msgCount(msgCountFT.get()).collectCount(collectCountFT.get()).followCount(followCountFT.get()).redBagCount(redBagCountFT.get()).couponCount(couponCountFT.get()).build();
            return userBehaviorData;
        } catch (Exception e) {
            log.error("get user behavior data error", e);
            return new UserBehaviorDataDTO();
        }
    }
 

这里用户服务类中我采用线程睡眠来模拟查询耗时

4.测试

访问测试接口,日志输出如下:

UserController的线程:Thread[http-nio-8080-exec-2,5,main]
MyFutureTask的线程:Thread[http-nio-8080-exec-2,5,main]
UserService获取ArticleCount的线程  pool-2-thread-1
UserService获取likeCount的线程  pool-2-thread-2
UserService获取MsgCount的线程  pool-2-thread-4
UserService获取CollectCount的线程  pool-2-thread-5
UserService获取FollowCount的线程  pool-2-thread-6
UserService获取RedBagCount的线程  pool-2-thread-7
UserService获取CouponCount的线程  pool-2-thread-8
获取CouponCount===睡眠:0s
获取RedBagCount===睡眠:1s
获取FollowCount===睡眠:1s
获取CollectCount==睡眠:2s
获取FansCount===睡眠:1s
UserService获取FansCount的线程  pool-2-thread-3
获取ArticleCount===睡眠:1s
获取MsgCount===睡眠:1s
获取likeCount===睡眠:2s
===============总耗时:2.019秒


可以看到,总耗时主要取决于耗时最长的那个操作,相比于串行查询肯定快多了

其他优化点

除了使用CompletableFuture并行查询优化外,还有以下可以提高接口查询速率的方法:


数据缓存: 对于一些常用且不经常变动的数据,可以考虑加入redis缓存或者本地缓存,减少数据库查询。

异步持久化: 对于一些不需要立即写入数据库的数据,可以先放入消息队列,由后台程序异步处理,减轻数据库压力。

分库分表: 对于数据量较大的表,可以考虑分库分表,避免单表数据量过大带来的查询效率问题。

总结


CompletableFuture为Java提供了强大的异步编程能力,可以极大地提高应用的并发能力和响应速度。通过并行执行多个查询任务,我们可以大幅减少接口的响应时间,优化用户体验。同时,CompletableFuture的代码风格函数式、简洁、优雅,也使得代码更加易读易维护。


但是,异步编程也不是万能的,它需要开发者转变思维模式,还需要权衡利弊。在实际项目中,我们可以结合其他优化手段,选择合适的方案,以达到最佳的性能效果。

相关文章
|
4天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
23 9
|
7天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
4天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
21 3
|
6天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
8天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
17天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
7天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
17 1
|
7天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
8天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
33 1