前言
在Web应用开发中,一个界面可能需要同时请求多个接口来获取不同信息。传统的做法是编写一个聚合接口同步获取这些数据,第二种方法是分多次请求来获取数据。这两种方式虽然简单直观,但效率比较低下,随着应用复杂度的增加,这种低效的做法将会带来严重的性能问题。
异步编程模型可以很好地解决这个问题。多个任务可以同时执行,互不影响,从而大幅提高应用的响应速度和吞吐量。Java 8 中引入的CompletableFuture为异步编程提供了强有力的支持,使得编写异步代码变得更加简单。本文将重点介绍如何利用CompletableFuture优化并发查询接口的响应速度。
实现思路
要优化并发查询接口的响应速度,传统的优化方式是通过多线程来并行执行多个查询任务。但这种做法存在一些缺陷:
创建和管理线程的开销较大,如果线程数量过多,会给系统带来很大的压力。
如果查询任务的执行时间不均匀,会导致部分线程需要长时间等待,资源利用率低下。
而CompletableFuture提供了一种更优雅、更高效的解决方案。其核心思路是:
每个查询任务都封装为一个CompletableFuture异步任务,由线程池并行执行。
通过CompletableFuture.allOf()方法等待所有异步任务完成。
最后从每个任务的结果中组装出最终需要的数据对象。
CompletableFuture快速入门
在JDK8以后,CompletableFuture提供了丰富的API用于异步编程,下面列举了一些最常见的用法:
1.创建CompletableFuture
有多种方式可以创建CompletableFuture:
// 从一个供给函数创建 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); // 从一个运行函数创建 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("Hello")); // 从一个已有的结果创建 CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
2.链式调用
CompletableFuture支持链式调用,可以方便地对异步结果进行转换和组合:
CompletableFuture<String> resultFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s + " World") // 对结果进行转换 .thenCompose(s -> getResult(s)); // 组合另一个异步操作
3.异常处理
通过exceptionally()方法可以对异常情况进行处理:
String result = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("error"); }).exceptionally(ex -> { // 处理异常 return "Default Value"; }).get();
4.组合多个CompletableFuture
通过allOf,anyOf这两种方式我们可以让任务之间协同工作,join()和get()方法都是阻塞调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。
get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。
而 join() 会抛出未经检查的异常。
// 等待所有任务完成 CompletableFuture.allOf(future1, future2, future3).get(); CompletableFuture.allOf(future1, future2, future3).join(); // 只要任意一个任务完成即可 CompletableFuture.anyOf(future1, future2, future3).get(); CompletableFuture.anyOf(future1, future2, future3).join(); // 规定超时时间,防止一直堵塞 CompletableFuture.allOf(future1, future2, future3).get(6, TimeUnit.SECONDS);
5.设置超时时间
我们可以通过下面的方式可以设置某个CompletableFuture的超时时间:
String result = CompletableFuture.supplyAsync(() -> "Hello") .completeOnTimeout("Timeout!", 1, TimeUnit.SECONDS) .get();
代码实现
1.初始化线程池
application.yaml配置文件
# 线程池配置 thread: pool: corePoolSize: 10 maxPoolSize: 20 queueCapacity: 100 keepAliveSeconds: 60
线程池配置类ThreadPoolConfig
/** * @author Luckysj @刘仕杰 * @description 线程池配置 * @create 2024/03/19 21:43:57 */ @Configuration public class ThreadPoolConfig { @Value("${thread.pool.corePoolSize}") private int corePoolSize; @Value("${thread.pool.maxPoolSize}") private int maxPoolSize; @Value("${thread.pool.queueCapacity}") private int queueCapacity; @Value("${thread.pool.keepAliveSeconds}") private int keepAliveSeconds; @Bean public ThreadPoolExecutor threadPoolExecutor() { return new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueCapacity), new ThreadPoolExecutor.CallerRunsPolicy()); } }
2.封装响应信息聚合对象
我们这里模拟用户相关的界面,这里需要点赞数,粉丝数,文章数等信息
/** * @author Luckysj @刘仕杰 * @description 信息聚合对象 * @create 2024/03/19 21:48:13 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class UserBehaviorDataDTO { //用户ID private Long userId ; //发布文章数 private Long articleCount ; //点赞数 private Long likeCount ; //粉丝数 private Long fansCount ; //消息数 private Long msgCount ; //收藏数 private Long collectCount ; //关注数 private Long followCount ; //红包数 private Long redBagCount ; // 卡券数 private Long couponCount ; }
3.通过CompletableFuture异步执行每一个查询操作
如下,我们定义了一个异步任务类,创建每一个查询操作的CompletableFuture异步任务放入线程中执行,并利用allOf等待全部任务执行完成,执行完成后组装查询信息到聚合对象中返回
/** * @author Luckysj @刘仕杰 * @description 一个页面可能有多达10个左右的一个用户行为数据,我们可以通过多线程来提高查询速率 * @create 2024/03/19 21:45:04 */ @Slf4j @Component public class MyFutureTask { @Resource UserService userService; // 线程池 @Resource private ExecutorService executor; public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) { System.out.println("MyFutureTask的线程:" + Thread.currentThread()); try { // 1.发布文章数 CompletableFuture<Long> articleCountFT = CompletableFuture.supplyAsync(() -> userService.countArticleCountByUserId(userId), executor); // 2.点赞数 CompletableFuture<Long> LikeCountFT = CompletableFuture.supplyAsync(() -> userService.countLikeCountByUserId(userId), executor); // 3.粉丝数 CompletableFuture<Long> fansCountFT = CompletableFuture.supplyAsync(() -> userService.countFansCountByUserId(userId), executor); // 4.消息数 CompletableFuture<Long> msgCountFT = CompletableFuture.supplyAsync(() -> userService.countMsgCountByUserId(userId), executor); // 5.收藏数 CompletableFuture<Long> collectCountFT = CompletableFuture.supplyAsync(() -> userService.countCollectCountByUserId(userId), executor); // 6.关注数 CompletableFuture<Long> followCountFT = CompletableFuture.supplyAsync(() -> userService.countFollowCountByUserId(userId), executor); // 7.红包数 CompletableFuture<Long> redBagCountFT = CompletableFuture.supplyAsync(() -> userService.countRedBagCountByUserId(userId), executor); // 8.卡券数 CompletableFuture<Long> couponCountFT = CompletableFuture.supplyAsync(() -> userService.countCouponCountByUserId(userId), executor); // 等待全部线程执行完毕 这里一定要设超时时间,不然会一直等待 CompletableFuture.allOf(articleCountFT, LikeCountFT, fansCountFT, msgCountFT, collectCountFT, followCountFT, redBagCountFT, couponCountFT).get(6, TimeUnit.SECONDS); // 必须设置合理的超时时间 UserBehaviorDataDTO userBehaviorData = UserBehaviorDataDTO.builder().articleCount(articleCountFT.get()).likeCount(LikeCountFT.get()).fansCount(fansCountFT.get()).msgCount(msgCountFT.get()).collectCount(collectCountFT.get()).followCount(followCountFT.get()).redBagCount(redBagCountFT.get()).couponCount(couponCountFT.get()).build(); return userBehaviorData; } catch (Exception e) { log.error("get user behavior data error", e); return new UserBehaviorDataDTO(); } }
这里用户服务类中我采用线程睡眠来模拟查询耗时
4.测试
访问测试接口,日志输出如下:
UserController的线程:Thread[http-nio-8080-exec-2,5,main] MyFutureTask的线程:Thread[http-nio-8080-exec-2,5,main] UserService获取ArticleCount的线程 pool-2-thread-1 UserService获取likeCount的线程 pool-2-thread-2 UserService获取MsgCount的线程 pool-2-thread-4 UserService获取CollectCount的线程 pool-2-thread-5 UserService获取FollowCount的线程 pool-2-thread-6 UserService获取RedBagCount的线程 pool-2-thread-7 UserService获取CouponCount的线程 pool-2-thread-8 获取CouponCount===睡眠:0s 获取RedBagCount===睡眠:1s 获取FollowCount===睡眠:1s 获取CollectCount==睡眠:2s 获取FansCount===睡眠:1s UserService获取FansCount的线程 pool-2-thread-3 获取ArticleCount===睡眠:1s 获取MsgCount===睡眠:1s 获取likeCount===睡眠:2s ===============总耗时:2.019秒
可以看到,总耗时主要取决于耗时最长的那个操作,相比于串行查询肯定快多了
其他优化点
除了使用CompletableFuture并行查询优化外,还有以下可以提高接口查询速率的方法:
数据缓存: 对于一些常用且不经常变动的数据,可以考虑加入redis缓存或者本地缓存,减少数据库查询。
异步持久化: 对于一些不需要立即写入数据库的数据,可以先放入消息队列,由后台程序异步处理,减轻数据库压力。
分库分表: 对于数据量较大的表,可以考虑分库分表,避免单表数据量过大带来的查询效率问题。
总结
CompletableFuture为Java提供了强大的异步编程能力,可以极大地提高应用的并发能力和响应速度。通过并行执行多个查询任务,我们可以大幅减少接口的响应时间,优化用户体验。同时,CompletableFuture的代码风格函数式、简洁、优雅,也使得代码更加易读易维护。
但是,异步编程也不是万能的,它需要开发者转变思维模式,还需要权衡利弊。在实际项目中,我们可以结合其他优化手段,选择合适的方案,以达到最佳的性能效果。