Pre
Java8 - 使用工厂方法 supplyAsync创建 CompletableFuture
接着上面的例子
假设非常不幸,无法控制 Shop 类提供API的具体实现,最终提供给你的API都是同步阻塞式的方法。这也是当你试图使用服务提供的HTTP API时最常发生的情况。你会学到如何以异步的方式查询多个商店,避免被单一的请求所阻塞,并由此提升你的“最佳价格查询器”的性能和吞吐量。
避免同步阻塞的困扰
假设你需要查询的所有商店只提供了同步API,换句话说,你有一个商家的列表,如下所示:
List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll"));
你需要使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称、该商店中指定商品的价格:
public List<String> findPrices(String product);
V1.0 改进 -采用Stream 顺序查询 (不理想)
第一个想法可能是使用 Stream 特性。
【采用顺序查询所有商店的方式实现的 findPrices 方法】
public List<String> findPrices(String product) { return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))) .collect(toList()); }
好吧,这段代码看起来非常直白。 此外,也请记录下方法的执行时间,通过这
些数据,我们可以比较优化之后的方法会带来多大的性能提升,具体的代码清单如下。
【验证 findPrices 的正确性和执行性能】
long start = System.nanoTime(); System.out.println(findPrices("myPhone27S")); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Done in " + duration + " msecs");
输出
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74] Done in 4032 msecs
正如你预期的, findPrices 方法的执行时间4S+,因为对这4个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大于1S的时间计算请求商品的价格。
怎样才能改进这个结果呢?
V2.0 改进 - 使用并行流对请求进行并行操作 (good)
对V1.0改成并行试试?
【对 findPrices 进行并行操作】
public List<String> findPrices(String product) { return shops.parallelStream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); }
区别在于 parallelStream ,使用并行流并行流从不同的商店获取价格。
运行代码,与V·1.0的执行结果相比较,发现了新版 findPrices 的改进了吧。
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74] Done in 1180 msecs
相当不错啊!看起来这是个简单但有效的主意:现在对四个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1S多一点儿。
还能能做得更好吗? 要不试试CompletableFuture ,将 findPrices 方法中对不同商店的同步调用替换为异步调用。
V3.0 改进 - 使用 CompletableFuture发起异步请求 ()
我们可以使用工厂方法 supplyAsync 创建 CompletableFuture 对象。让我们把它利用起来:
List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))) .collect(toList());
使用这种方式,你会得到一个 List<CompletableFuture<String>> ,列表中的每个CompletableFuture 对象在计算完成后都包含商店的 String 类型的名称。但是,由于你用CompletableFutures 实现的 findPrices 方法要求返回一个 List<String> ,你需要等待所有的 future 执行完毕,将其包含的值抽取出来,填充到列表中才能返回
为了实现这个效果,你可以向最初的 List<CompletableFuture<String>> 添加第二个map 操作,对 List 中的所有 future 对象执行 join 操作,一个接一个地等待它们运行结束。
Note: CompletableFuture 类中的 join 方法和 Future 接口中的 get 有相同的含义,并且也声明在Future 接口中,它们唯一的不同是 join 不会抛出任何检测到的异常。使用它你不再需要使用try / catch 语句块让你传递给第二个 map 方法的Lambda表达式变得过于臃肿。
所有这些整合在一起,你就可以重新实现 findPrices 了,具体代码如下
public List<String> findPrices(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(toList()); }
注意到了吗?这里使用了两个不同的 Stream 流水线,而不是在同一个处理流的流水线上一个接一个地放两个 map 操作——这其实是有缘由的。
考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建 CompletableFuture 对象只能在前一个操作结束之后执行查询指定商家的动作、通知 join方法返回计算结果。
【为什么 Stream 的延迟特性会引起顺序执行,以及如何避免】见下图
上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标识)是顺序的。事实上,新的 CompletableFuture 对象只有在前一个操作完全结束之后,才能创建。与此相反,图的下半部分展示了如何先将 CompletableFutures 对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。
运行代码 第三个版本 findPrices 方法的性能,你会得到下面这几行输出:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74] Done in 2005 msecs
超过2S意味着利用 CompletableFuture 实现的版本比刚开始原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,就让人更加沮丧
与此形成鲜明对比的是,我们为采用 CompletableFutures 完成的新版方法做了大量的工作!
但,这就是全部的真相吗?这种场景下使用 CompletableFutures 真的是浪费时间吗?或者我们可能漏了某些重要的东西?
更好的方案
并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果你想要增加第五个商家到商点列表中,让你的“最佳价格查询”应用



