- 多任务结果合并
示例代码如下:
CompletableFuture future1 = CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任务2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任务3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3}; CompletableFuture.allOf(futures) // 任务4 .whenCompleteAsync( (v, e) -> { List<Object> values = new ArrayList<>(); for (CompletableFuture future : futures) { try { values.add(future.get()); } catch (Exception ex) { } } Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task4"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("前面任务的处理结果:" + values); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); }, THREAD_POOL_EXECUTOR); 复制代码
执行结果如下:
completable-future-test-Thread-3开始执行任务:task2 completable-future-test-Thread-4开始执行任务:task3 completable-future-test-Thread-2开始执行任务:task1 正在执行任务task2 正在执行任务task3 正在执行任务task1 task2执行结束 task3执行结束 task1执行结束 completable-future-test-Thread-2开始执行任务:task4 前面任务的处理结果:[task1, task2, task3] 正在执行任务task4 task4执行结束 复制代码
之所以最后任务4的线程是completable-future-test-Thread-2
,那是因为线程池的核心线程数设置为3,线程数设置高一点就会创建新的线程处理;
从上述代码示例中,我们可以收获到另一个知识点:allOf()
,它的作用是要求所有的任务全部完成才能执行后面的任务。
任一任务完成
在一批任务中,只要有一个任务完成,那么就可以向后继续执行其他任务。
为了代码演示无异议,后续代码中,我们把线程数提升到4。
示例代码如下:
CompletableFuture future1 = CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任务2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任务3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture.anyOf(future1, future2, future3) .thenApplyAsync((taskResult) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task4"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("前面任务的处理结果:" + taskResult); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); 复制代码
执行结果如下:
completable-future-test-Thread-2开始执行任务:task1 completable-future-test-Thread-4开始执行任务:task3 completable-future-test-Thread-3开始执行任务:task2 正在执行任务task3 正在执行任务task2 正在执行任务task1 task1执行结束 task3执行结束 task2执行结束 completable-future-test-Thread-5开始执行任务:task4 前面任务的处理结果:task1 正在执行任务task4 task4执行结束 复制代码
可以看到,任务1第一个结束,所以任务4中接收到的执行结果就是任务1的返回值。
快速失败
在一批任务当中,只要有任意一个任务执行产生异常了,那么就直接结束;否则就要等待所有任务成功执行完毕。
示例代码如下:
CompletableFuture future1 = CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture future2 = CompletableFuture // 任务2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); throw new RuntimeException("任务2异常!"); }, THREAD_POOL_EXECUTOR); CompletableFuture future3 = CompletableFuture // 任务3 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); throw new RuntimeException("任务3异常!"); }, THREAD_POOL_EXECUTOR); CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3}; CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures); // 创建一个任务来监听异常 CompletableFuture<?> anyException = new CompletableFuture<>(); for (CompletableFuture<?> completableFuture : futures) { completableFuture.exceptionally((t) -> { // 任何一个任务异常都会让anyException任务完成 anyException.completeExceptionally(t); return null; }); } // 要么allCompletableFuture全部成功,要么一个出现异常就结束任务 CompletableFuture.anyOf(allCompletableFuture, anyException) .whenComplete((value, exception) -> { if (Objects.nonNull(exception)) { System.out.println("产生异常,提前结束!"); exception.printStackTrace(); return; } System.out.println("所有任务正常完成!"); }); 复制代码
执行结果如下:
completable-future-test-Thread-2开始执行任务:task1 completable-future-test-Thread-3开始执行任务:task2 completable-future-test-Thread-4开始执行任务:task3 正在执行任务task2 正在执行任务task3 正在执行任务task1 task2执行结束 task1执行结束 task3执行结束 产生异常,提前结束! java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务2异常! at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: 任务2异常! at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more 复制代码
CompletableFuture
没有现成的api实现快速失败的功能,所以我们只能结合allOf()
和anyOf()
来逻辑来自定义方法完成快速失败的逻辑;1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。
2.把所有的CompletableFuture和额外的CompletableFuture放在
anyOf()
方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。
注意
- 异常处理
最后需要注意的是,所有的CompletableFuture
任务一定要加上异常处理:
CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) .whenComplete((v,e)->{ if(Objects.nonNull(e)){ // todo // 处理异常 } if(Objects.nonNull(v)){ // todo } }); 复制代码
还可以通过另外两个方法处理:exceptionally()
或者handle()
;
- 自定义线程池
CompletableFuture
默认的线程池是ForkJoinThreadPool
,建议大家在使用的时候尽可能地使用自定义线程池,这样方便后续的代码优化以及相关的日志查看。