CompletableFuture
相比于jdk5所提出的future概念,future在执行的时候支持异步处理,但是在回调的过程中依旧是难免会遇到需要等待的情况。
在jdk8里面,出现了CompletableFuture的新概念,支持对于异步处理完成任务之后自行处理数据。当发生异常的时候也能按照自定义的逻辑来处理。
如何通过使用CompletableFuture提升查询的性能呢?
下边我举个例子来演示:
首先我们定义一个UserInfo对象:
/** * @author idea * @data 2020/2/22 */ public class UserInfo { private Integer id; private String name; private Integer jobId; private String jobDes; private Integer carId; private String carDes; private Integer homeId; private String homeDes; public Integer getId() { return id; } public UserInfo setId(Integer id) { this.id = id; return this; } public String getName() { return name; } public UserInfo setName(String name) { this.name = name; return this; } public Integer getJobId() { return jobId; } public UserInfo setJobId(Integer jobId) { this.jobId = jobId; return this; } public String getJobDes() { return jobDes; } public UserInfo setJobDes(String jobDes) { this.jobDes = jobDes; return this; } public Integer getCarId() { return carId; } public UserInfo setCarId(Integer carId) { this.carId = carId; return this; } public String getCarDes() { return carDes; } public UserInfo setCarDes(String carDes) { this.carDes = carDes; return this; } public Integer getHomeId() { return homeId; } public UserInfo setHomeId(Integer homeId) { this.homeId = homeId; return this; } public String getHomeDes() { return homeDes; } public UserInfo setHomeDes(String homeDes) { this.homeDes = homeDes; return this; } }
这个对象里面的homeid,jobid,carid都是用于匹配对应的住房信息描述,职业信息描述,购车信息描述。
对于将id转换为描述信息的方式需要通过额外的sql查询,这里做了个简单的工具类来进行模拟:
import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** * @author idea * @data 2020/2/22 */ public class QueryUtils { public String queryCar(Integer carId){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "car_desc"; } public String queryJob(Integer jobId){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "job_desc"; } public String queryHome(Integer homeId){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "home_desc"; } }
这个工具类的功能看起来会比较通俗易懂,在常规的逻辑里面,我们做批量对象的转换大多数都是基于List遍历,然后在循环里面批量查询,这样的方式并非说不行,而是显得比较过于“暴力”。
假设每次查询需要消耗1s,那么遍历的一个size为n的集合查询消耗的时间就是n * 3s。
下边来介绍一种更为方便的技巧:CompletableFuture
定义一个QuerySupplier 实现Supplier接口,根据注入的类型进行转译查询:
import java.util.function.Supplier; public class QuerySuppiler implements Supplier<String> { private Integer id; private String type; private QueryUtils queryUtils; public QuerySuppiler(Integer id, String type,QueryUtils queryUtils) { this.id = id; this.type = type; this.queryUtils=queryUtils; } @Override public String get() { if("home".equals(type)){ return queryUtils.queryHome(id); }else if ("job".equals(type)){ return queryUtils.queryJob(id); }else if ("car".equals(type)){ return queryUtils.queryCar(id); } return null; } }
由于对应的carid,homeid,jobid都需要到指定的k,v配置表里面通过核心查询包装器来进行转译,因此通常的做法就是在for循环里面一个个地进行遍历解析,这样的做法也比较易于理解。
QuerySuppiler 是我写的一个用于做对象解析的服务,代码如下所示:
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; /** * @author idea * @data 2020/2/22 */ public class QueryUserService { private Supplier<QueryUtils> queryUtilsSupplier = QueryUtils::new; public UserInfo converUserInfo(UserInfo userInfo) { QuerySuppiler querySuppiler1 = new QuerySuppiler(userInfo.getCarId(), "car", queryUtilsSupplier.get()); CompletableFuture<String> getCarDesc = CompletableFuture.supplyAsync(querySuppiler1); getCarDesc.thenAccept(new Consumer<String>() { --1 @Override public void accept(String carDesc) { userInfo.setCarDes(carDesc); } }); QuerySuppiler querySuppiler2 = new QuerySuppiler(userInfo.getHomeId(), "home", queryUtilsSupplier.get()); CompletableFuture<String> getHomeDesc = CompletableFuture.supplyAsync(querySuppiler2); getHomeDesc.thenAccept(new Consumer<String>() { --2 @Override public void accept(String homeDesc) { userInfo.setHomeDes(homeDesc); } }); QuerySuppiler querySuppiler3 = new QuerySuppiler(userInfo.getJobId(), "job", queryUtilsSupplier.get()); CompletableFuture<String> getJobDesc = CompletableFuture.supplyAsync(querySuppiler3); getJobDesc.thenAccept(new Consumer<String>() { --3 @Override public void accept(String jobDesc) { userInfo.setJobDes(jobDesc); } }); CompletableFuture<Void> getUserInfo = CompletableFuture.allOf(getCarDesc, getHomeDesc, getJobDesc); getUserInfo.thenAccept(new Consumer<Void>() { @Override public void accept(Void result) { System.out.println("全部完成查询" ); } }); getUserInfo.join(); --4 return userInfo; } public static void main(String[] args) { long begin= System.currentTimeMillis(); //多线程环境需要注意线程安全问题 List<UserInfo> userInfoList=Collections.synchronizedList(new ArrayList<>()); for(int i=0;i<=20;i++){ UserInfo userInfo=new UserInfo(); userInfo.setId(i); userInfo.setName("username"+i); userInfo.setCarId(i); userInfo.setJobId(i); userInfo.setHomeId(i); userInfoList.add(userInfo); } //stream 查询一个用户花费3s 并行计算后一个用户1秒左右 查询21个用户花费21秒 //parallelStream 速度更慢 userInfoList.stream() .map(userInfo->{ QueryUserService queryUserService=new QueryUserService(); userInfo =queryUserService.converUserInfo(userInfo); return userInfo; }).collect(Collectors.toList()); System.out.println("============="); long end=System.currentTimeMillis(); System.out.println(end-begin); } }
看看这段代码的—1,—2,—3部分,三个执行点的位置在使用了thenAccept组装数据之后,还是可以避开串行化获取数据的情况。只有在—4的位置才会发生堵塞。这样对于性能的提升效果更佳。
这里进行模拟测试,采用原始暴力手段查询所消耗的时间是20 * 3 =60秒,但是这里使用了CompletableFuture之后,查询的时间就会缩短为了21秒。
结果:
全部完成查询 ============= 21223
这是一种使用了空间换时间的思路,或许你会说,异步查询如果使用FutureTask是不是也
可以呢。嗯嗯,是的,但是使用future有个问题,就是在于返回获取异步结果的时候需要有等待状态,这个等待的状态是需要消耗时间进行堵塞的。
这里我也做了关于使用普通FutureTask来执行查询优化的结果:
/** * 使用 FutureTask 来优化查询 * * @param userInfo * @return */ public UserInfo converUserInfoV2(UserInfo userInfo) { Callable<String> homeCallable=new Callable() { @Override public Object call() throws Exception { return queryUtilsSupplier.get().queryHome(userInfo.getHomeId()); } }; FutureTask<String> getHomeDesc=new FutureTask<>(homeCallable); new Thread(getHomeDesc).start(); futureMap.put("homeCallable",getHomeDesc); Callable<String> carCallable=new Callable() { @Override public Object call() throws Exception { return queryUtilsSupplier.get().queryCar(userInfo.getCarId()); } }; FutureTask<String> getCarDesc=new FutureTask(carCallable); new Thread(getCarDesc).start(); futureMap.put("carCallable",getCarDesc); Callable<String> jobCallable=new Callable() { @Override public Object call() throws Exception { return queryUtilsSupplier.get().queryCar(userInfo.getJobId()); } }; FutureTask<String> getJobDesc=new FutureTask<>(jobCallable); new Thread(getJobDesc).start(); futureMap.put("jobCallable",getJobDesc); try { userInfo.setHomeDes((String) futureMap.get("homeCallable").get()); userInfo.setCarDes((String)futureMap.get("carCallable").get()); userInfo.setJobDes((String)futureMap.get("jobCallable").get()); } catch (Exception e) { e.printStackTrace(); } System.out.println("该对象完成查询" ); return userInfo; }
经过测试,使用 futuretask 进行优化的查询结果只有47s左右,远远不及CompletableFuture的性能高效.这是因为使用了futuretask的get方法依然是存在堵塞的情况。
关键部分看这段内容:
userInfo.setHomeDes((String) futureMap.get("homeCallable").get()); --1 userInfo.setCarDes((String)futureMap.get("carCallable").get()); --2 userInfo.setJobDes((String)futureMap.get("jobCallable").get()); --3
—1代码在执行的时候遇到了堵塞,然后—2和—3的get也需要进行等待,因此使用常规的futuretask进行优化,这里难免还是会有堵塞的情况。
END