前言
有段时间在研究WebSocket和HTTP2.0版本,JDK11的新的HttpClient支持HTTP/2和WebSocket,于是我就尝试着用了用,然后就发现了CompletableFuture这个类, 我当时写的代码如下:
public static void main(String[] args) { HttpClient httpClient = HttpClient.newBuilder().version(Version.HTTP_2).build(); httpClient.version(); CompletableFuture<WebSocket> result = httpClient.newWebSocketBuilder().buildAsync(null, null); Class<? extends HttpClient> clazz = httpClient.getClass(); }
然后我就点进去看了下CompletableFuture的源码,发现这个类自1.8被引入,也是关于异步编程的。该类实现了Future和CompletionStage接口. Future我们前面的文章已经讨论过了,所以CompletableFuture具备Future的特性,而CompletionStage是什么呢?首先这个类名我们应该怎么理解, CompletionStage是一个合成词:
- Completion: 完成, 结束。
- Stage: 步骤,步,阶段
所以我们姑且就可以将CompletionStage理解为完成步骤或者完成阶段。在研究CompletableFuture,我们先看下CompletionStage。本篇需要Lambda表达式的相关基础, 如果对Lambda表达式还不是很了解,可以参看我在掘金写的文章:
- Lambda表达式 函数式编程 Stream API学习笔记
先看CompletionStage
先看注释
A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. A stage completes upon termination of its computation, but this may in turn trigger other dependent stages. The functionality defined in this interface takes only a few basic forms, which expand out to a larger set of methods to capture a range of usage styles:
一个异步计算的步骤可能表现为一个动作或者另一个CompletionStage(完成阶段)完成时计算值。一个步骤在计算完成时结束,但是这个步骤也可能触发其他步骤。在CompletionStage接口中定义了一些基本形式的函数,这些方法可以被扩展以适应不同的风格。
- The computation performed by a stage may be expressed as a Function, Consumer, or Runnable (using methods with names including apply, accept, or run, respectively) depending on whether it requires arguments and/or produces results. For example, stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()). An additional form (compose) applies functions of stages themselves, rather than their results. 这一类步骤的计算被表现为Function(Lambda表达式,函数式接口)、Consumer(Lambda表达式 函数式接口)、Runnable(方法名包括apply、accept、run) 例子:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println() //简单讲述下这段代码的意思, thenApply拿到接收方法的值,传递给thenAccept,然后在thenRun.
另一种形式的用法是应用步骤本身的函数而不是他们的结果。
- One stage's execution may be triggered by completion of a single stage, or both of two stages, or either of two stages. Dependencies on a single stage are arranged using methods with prefix then. Those triggered by completion of both of two stages may combine their results or effects, using correspondingly named methods. Those triggered by either of two stages make no guarantees about which of the results or effects are used for the dependent stage's computation. 一个步骤的执行可能被另一个步骤或者两个步骤完成时,两个步骤中任意一个步骤完成触发. 依赖于一个步骤的用前缀为then的方法。被两个步骤完成才触发且需要使用结果的可以使用方法名包含combine的方法。两个步骤任意一个步骤完成都可以触发的步骤,下一个步骤所接收的结果无法保证是哪一个步骤,可以使用方法名宝行Either。
- Dependencies among stages control the triggering of computations, but do not otherwise guarantee any particular ordering. Additionally, execution of a new stage's computations may be arranged in any of three ways: default execution, default asynchronous execution (using methods with suffix async that employ the stage's default asynchronous execution facility), or custom (via a supplied Executor). The execution properties of default and async modes are specified by CompletionStage implementations, not this interface. Methods with explicit Executor arguments may have arbitrary execution properties, and might not even support concurrent execution, but are arranged for processing in a way that accommodates asynchrony.
如果是各个步骤之间的计算和触发不要求特定的顺序,就可以考虑异步执行,步骤的执行有三种形式: 默认执行、异步执行(方法名带有async的,在执行的时候就是异步执行), 定制(支持接收执行者)。默认和异步模式的执行属性由CompletionStage实现指定,而不是在这个接口,具有显式Executor参数的方法可能具有任意的执行属性,甚至可能不支持并发执行,但其处理方式可以适应异步。
- Two method forms support processing whether the triggering stage completed normally or exceptionally: Method whenComplete allows injection of an action regardless of outcome, otherwise preserving the outcome in its completion. Method handle additionally allows the stage to compute a replacement result that may enable further processing by other dependent stages. In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause. If a stage is dependent on both of two stages, and both complete exceptionally, then the CompletionException may correspond to either one of these exceptions. If a stage is dependent on either of two others, and only one of them completes exceptionally, no guarantees are made about whether the dependent stage completes normally or exceptionally. In the case of method whenComplete, when the supplied action itself encounters an exception, then the stage exceptionally completes with this exception if not already completed exceptionally.
正常完成或者异常完成的触发有两个方法支持。方法whenComplete在执行的时候是忽略是正常结束还是异常结束, 如果是异常结束则whenComplete接收的值为null,异常有值。handle方法可以对上一步的结果再进行计算,让下一个阶段接着处理。再有,如果其他步骤因为发成了异常和错误而结束,所有依赖此步骤的步骤将会被完成(completeExceptionally方法),completeExceptionally持有异常完成的原因。如果一个步骤依赖于某两个步骤中的一个步骤,如果被依赖的两个步骤意外结束,不保证下一个步骤的正常完成还是异常完成。如果whenComplete接收的动作发生了异常,下一个阶段也会携带whenComplete方法发生的异常。
我的体会
如果某项任务比较大,我们通常会将浙这些任务进行拆解,分工。在一些步骤完成之后,完成之后进行下一步骤,这在现实生活中是相当常见的。那么软件的世界呢,在Java中呢,我们需要委托线程进行工作呢? 工作模型像下面这样:我们可以使用的接口有哪些呢?想想之前的线程协作:
- CountDownLatch
- CyclicBarrier
- Semaphore
- Future
- FutureTask
这些组合起来去完成我们提出的目标, 似乎有点费手脚,CompletionStage就为解决这样的计算任务而生,将任务拆解为一个一个的步骤,任务完成返回CompletionStage。注释提到的任务模型:
- 一个接一个(一个任务触发完成之后来到下一个任务) 方法名中包含apply、accept、then、run对应这种任务模型
- 多个任务完成之后来到下一个任务 方法名中包含combine对应这种任务模型
- 多个任务中只要有一个完成就触发下一个任务 方法名中带有Either对应这种任务模型
带有async是异步执行也就是会开启一个线程执行任务。方法正常完成和异常完成触发的回调为whenComplete和handle. 但CompletionStage只提供了规范,真正要使用我们还要看它的实现类为CompletableFuture。
CompletableFuture 简单使用示例
单链任务使用示例
/** * 做菜任务示例 * 先买菜,切菜,然后再吃饭 */ private static void makeFoodDemo() { CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> { System.out.println("去买菜"); return "买菜成功"; }).thenApplyAsync(o -> { System.out.println("开始做菜"); return "做菜完成"; }); // 任务执行完毕后 会触发此方法 System.out.println(task.join()+"开始吃饭"); }
两个都成功才触发示例
/** * 合并结果 */ private static void appointmentTaskDemo() { CompletableFuture<String> boyTask = CompletableFuture.supplyAsync(() -> { return "男孩子发出邀请"; }); boyTask.thenCombineAsync(CompletableFuture.supplyAsync(()->"女孩子接收邀请"),(o1,o2)->{ System.out.println(o1); System.out.println(o2); return "开始约会"; }).thenAccept(o-> System.out.println(o)); }
两个任意一个成功才触发示例
/** * 做菜任务示例 * 先买菜,切菜,然后再吃饭 */ private static void makeFoodDemo() { CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> { System.out.println("去买菜"); return "买菜成功"; }).thenApplyAsync(o -> { System.out.println("开始做菜"); return "做菜完成"; }); // 任务执行完毕后 会触发此方法 System.out.println(task.join()+"开始吃饭"); }
总结
有了CompletableFuture切割任务是不是就有够顺滑了呢。这也就是我学习的方法,有注释优先看注释,大多数情况下,注释都写的简单明了。