使用CompletableFuture 发起异步请求
你已经知道我们可以使用工厂方法supplyAsync创建CompletableFuture对象。让我们把它利用起来:
public List<CompletableFuture<String>> findPricesFuture(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product)))) .collect(toList()); } 复制代码复制代码
使用这种方式,你会得到一个List,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称。但是,由于你用CompletableFutures实现的findPrices方法要求返回一个List,你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。
为了实现这个效果,你可以向最初的List施加第二个map操作,对List中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。所有这些整合在一起,你就可以重新实现findPrices了,具体代码如下。
public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product)))) .collect(toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } 复制代码复制代码
运行下代码了解下第三个版本findPrices方法的性能,你会得到下面这几行输出:
[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20] 完成时间 2207 复制代码复制代码
这个结果让人相当失望,不是吗?超过2秒意味着利用CompletableFuture实现的版本,比刚开始的代码中的原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,就让人更加沮丧。
与此形成鲜明对比的是,我们为采用CompletableFutures完成的新版方法做了大量的工作!但,这就是全部的真相吗?这种场景下使用CompletableFutures真的是浪费时间吗?或者我们可能漏掉了某些重要的东西?继续往下探究之前,让我们休息几分钟,尤其是想想你测试代码的机器是否足以以并行方式运行四个线程。
寻找更好的方案
并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果你想要增加第五个商家到商店列表中,让你的“最佳价格查询”应用对其进行处理,这时会发生什么情况?
public class BestPriceFinder { private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll"), new Shop("ShopEasy")); ... public List<String> findPricesParallel(String product) { return shops.parallelStream() .map(shop -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); } public List<String> findPricesSequential(String product) { return shops.stream() .map(shop -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); } public List<String> findPricesFuture(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product)))) .collect(toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } } public class BestPriceFinderMain { private static BestPriceFinder bestPriceFinder = new BestPriceFinder(); public static void main(String[] args) { execute("sequential", () -> bestPriceFinder.findPricesSequential("Old-Mi-Mix3")); } private static void execute(String msg, Supplier<List<String>> s) { long start = System.nanoTime(); System.out.println(s.get()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println(msg + " 完成时间 " + duration); } } 复制代码复制代码
毫不意外,顺序执行版本的执行还是需要大约5秒多钟的时间,下面是执行的输出:
[BestPrice 价格 109.64, LetsSaveBig 价格 143.13, MyFavoriteShop 价格 175.50, BuyItAll 价格 154.20, ShopEasy 价格 147.92] sequential 完成时间 5139 复制代码复制代码
非常不幸,并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态的)的四个线程现在都处于繁忙状态,都在对前4个商店进行查询。第五个查询只能等到前面某一个操作完成释放出空闲线程才能继续,它的运行结果如下:
[BestPrice 价格 163.19, LetsSaveBig 价格 141.77, MyFavoriteShop 价格 159.81, BuyItAll 价格 165.02, ShopEasy 价格 165.81] parallel 完成时间 2106 复制代码复制代码
CompletableFuture版本的程序结果如何呢?我们也试着添加第5个商店对其进行了测试,结果如下:
[BestPrice 价格 144.31, LetsSaveBig 价格 142.49, MyFavoriteShop 价格 146.99, BuyItAll 价格 132.52, ShopEasy 价格 139.15] composed CompletableFuture 完成时间 2004 复制代码复制代码
CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。比如,如果你试图让你的代码处理9个商店,并行流版本耗时3143毫秒,而CompletableFuture版本耗时3009毫秒。它们看起来不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看你怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。
使用定制的执行器
就这个主题而言,明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?
你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你期望的CPU利用率是100%,你需要创建一个拥有400个线程的线程池。实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。
private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); 复制代码复制代码
注意,你现在正创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。现在,你可以将执行器作为第二个参数传递给supplyAsync工厂方法了。比如,你现在可以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:
CompletableFuture.supplyAsync(() -> String.format("%s 价格 %.2f", shop.getName(), shop.getPrice(product)), executor) 复制代码复制代码
改进之后,使用CompletableFuture方案的程序处理5个商店结果:
[BestPrice 价格 144.31, LetsSaveBig 价格 142.49, MyFavoriteShop 价格 146.99, BuyItAll 价格 132.52, ShopEasy 价格 139.15] composed CompletableFuture 完成时间 1004 复制代码复制代码
这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFutures向其提交任务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。
并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。书中使用这些API的建议如下。
- 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
- 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。
现在你已经了解了如何利用CompletableFuture为你的用户提供异步API,以及如何将一个同步又缓慢的服务转换为异步的服务。不过到目前为止,我们每个Future中进行的都是单次的操作。