多个CompletableFuture进行组合运算
CompletableFuture功能强大的原因之一是其可以让两个或者多个Completable-Future进行运算来产生结果,下面我们来看其提供的几组函数:
1)基于thenCompose实现当一个CompletableFuture执行完毕后,执行另外一个CompletableFuture:
public class TestTwoCompletableFuture { // 1.异步任务,返回future public static CompletableFuture<String> doSomethingOne(String encodedCompanyId) { // 1.1创建异步任务 return CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { // 1.1.1休眠1s,模拟任务计算 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 1.1.2 解密,并返回结果 String id = encodedCompanyId; return id; } }); } // 2.开启异步任务,返回future public static CompletableFuture<String> doSomethingTwo(String companyId) { return CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { // 2.1 休眠3s,模拟计算 try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 2.2 查询公司信息,转换为str,并返回 String str = companyId + ":alibaba"; return str; } }); } public static void main(String[] args) throws InterruptedException, ExecutionException { // I,等doSomethingOne执行完毕后,接着执行doSomethingTwo CompletableFuture result = doSomethingOne("123").thenCompose(id -> doSomethingTwo(id)); System.out.println(result.get()); } }
上述main函数中首先调用方法doSomethingOne(“123”)开启了一个异步任务,并返回了对应的CompletableFuture对象,我们取名为future1,然后在future1的基础上调用了thenCompose方法,企图让future1执行完毕后,激活使用其结果作为doSomethingTwo(String companyId)方法的参数的任务。
2)基于thenCombine实现当两个并发运行的CompletableFuture任务都完成后,使用两者的结果作为参数再执行一个异步任务,这里只需要把上面例子中的:
CompletableFuture result = doSomethingOne("123").thenCompose(id -> doSomethingTwo(id));
修改为:
result = doSomethingOne("123").thenCombine(doSomethingTwo("456"), (one, two) -> { return one + " " + two; });
3)基于allOf等待多个并发运行的CompletableFuture任务执行完毕:
public static void allOf() throws InterruptedException, ExecutionException { // 1.创建future列表 List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(doSomethingOne("1")); futureList.add(doSomethingOne("2")); futureList.add(doSomethingOne("3")); futureList.add(doSomethingOne("4")); // 2.转换多个future为一个 CompletableFuture<Void> result = CompletableFuture .allOf(futureList.toArray(new CompletableFuture[futureList.size()])); // 3.等待所有future都完成 System.out.println(result.get()); }
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用allOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中所有任务执行完毕才返回。
4)基于anyOf等多个并发运行的CompletableFuture任务中有一个执行完毕就返回
public static void anyOf() throws InterruptedException, ExecutionException { // 1.创建future列表 List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(doSomethingOne("1")); futureList.add(doSomethingOne("2")); futureList.add(doSomethingTwo("3")); // 2.转换多个future为一个 CompletableFuture<Object> result = CompletableFuture .anyOf(futureList.toArray(new CompletableFuture[futureList.size()])); // 3.等待某一个future完成 System.out.println(result.get()); }
如上代码1调用了四次doSomethingOne方法,分别返回一个CompletableFuture对象,然后收集这些CompletableFuture到futureList列表。
代码2调用anyOf方法把多个CompletableFuture转换为一个result,代码3在result上调用get()方法会阻塞调用线程,直到futureList列表中有一个任务执行完毕才返回。
异常处理
前文的代码为我们演示的功能都是当异步任务内可以正常设置任务结果时的情况,但是情况并不总是这样的,比如下面这段代码:
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { // 1.创建一个CompletableFuture对象 CompletableFuture<String> future = new CompletableFuture<String>(); // 2.开启线程计算任务结果,并设置 new Thread(() -> { // 2.1休眠3s,模拟任务计算 try { // 2.1.1抛出异常 if (true) { throw new RuntimeException("excetion test"); } // 2.1.2设置正常结果 future.complete("ok"); } catch (Exception e) { } // 2.2设置计算结果到future System.out.println("----" + Thread.currentThread().getName() + " set future result----"); }, "thread-1").start(); // 3.等待计算结果 System.out.println(future.get()); }
由上述代码可知,在代码2.1.2设置正常结果前,代码2.1.1抛出了异常,这会导致代码3一直阻塞,所以我们不仅需要考虑正常设置结果的情况,还需要考虑异常的情况,其实CompletableFuture提供了completeExceptionally方法来处理异常情况,将上述代码修改为如下所示。
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { // 1.创建一个CompletableFuture对象 CompletableFuture<String> future = new CompletableFuture<String>(); // 2.开启线程计算任务结果,并设置 new Thread(() -> { // 2.1休眠3s,模拟任务计算 try { // 2.1.1 抛出异常 if (true) { throw new RuntimeException("excetion test"); } // 2.1.2设置正常结果 future.complete("ok"); } catch (Exception e) { // 2.1.3 设置异常结果 future.completeExceptionally(e); } // 2.2设置计算结果到future System.out.println("----" + Thread.currentThread().getName() + " set future result----"); }, "thread-1").start(); // 3.等待计算结果 System.out.println(future.get()); }
如上代码2.1.3表示当出现异常时把异常信息设置到future内部,这样代码3就会在抛出异常后终止。
其实我们还可以修改代码3为:
System.out.println(future.exceptionally(t -> "default").get());// 默认值
实现当出现异常时返回默认值。