前言
在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关。为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排。
先创建一个自定义的线程池,后续所有代码都会使用到:
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() { private final AtomicInteger THREAD_NUM = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); // 设置为守护线程,main线程结束就跟着一起结束,否则main函数结束jvm还在 t.setDaemon(true); t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet()); return t; } }, new ThreadPoolExecutor.AbortPolicy()); 复制代码
同步串行
同步串行代表任务1、任务2、任务3按时间先后顺序执行,并且都是同一个线程来执行。
示例代码如下:
CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenApply( (task1Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println("拿到上一个任务的返回值:" + task1Result); System.out.println(taskName + "执行结束"); return taskName; }) .thenAccept( (task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println("拿到上一个任务的返回值:" + task2Result); System.out.println(taskName + "执行结束"); }); 复制代码
执行结果:
completable-future-test-Thread-2开始执行任务:task1 正在执行任务task1 task1执行结束 completable-future-test-Thread-2开始执行任务:task2 正在执行任务task2 拿到上一个任务的返回值:task1 task2执行结束 completable-future-test-Thread-2开始执行任务:task3 正在执行任务task3 拿到上一个任务的返回值:task2 task3执行结束 复制代码
1.入口函数
supplyAsync()
代表一个异步的有返回值的函数,之所以异步,是与主线程区别,从线程池中的拿一个线程来执行。2.
thenApply()
和thenAccept()
没有Async
,意味着是和前面的任务共用一个线程,从执行结果上我们也可以看到线程名称相同。3.
thenApply()
需要接收上一个任务的返回值,并且自己也要有返回值。4.
thenAccept()
需要接收上一个任务的返回值,但是它不需要返回值。
异步串行
异步串行代表任务1、任务2、任务3按时间先后顺序执行,并由不同的线程来执行。
示例代码如下:
CompletableFuture // 有返回值 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) // 需要上一个任务的返回值,并且自身有返回值 .thenApplyAsync( (task1Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println("拿到上一个任务的返回值:" + task1Result); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) // 不需要上一个任务的返回值,自身也没有返回值 .thenRunAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println("thenRunAsync()不需要上一个任务的返回值"); System.out.println(taskName + "执行结束"); }, THREAD_POOL_EXECUTOR); 复制代码
执行结果如下:
completable-future-test-Thread-2开始执行任务:task1 正在执行任务task1 task1执行结束 completable-future-test-Thread-3开始执行任务:task2 正在执行任务task2 拿到上一个任务的返回值:task1 task2执行结束 completable-future-test-Thread-4开始执行任务:task3 正在执行任务task3 thenRunAsync()不需要上一个任务的返回值 task3执行结束 复制代码
1.入口函数依然是
supplyAsync()
,需要传入一个有返回值的函数作为参数;如果想要没有返回值的函数传进来的话,可以使用CompletableFuture.runAsync()
;2.
thenApplyAsync()
和thenRunAsync()
分别表示里面的任务都是异步执行的,和执行前面的任务不是同一个线程;3.
thenRunAsync()
需要传入一个既不需要参数,也没有返回值的任务;
并行任务
并行代表任务1、任务2、任务3没有依赖关系,分别由不同的线程执行;
示例代码如下:
CompletableFuture<String> future1 = CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); CompletableFuture<Void> future2 = CompletableFuture .runAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); }, THREAD_POOL_EXECUTOR); CompletableFuture<String> future3 = CompletableFuture .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR); 复制代码
执行结果如下:
completable-future-test-Thread-4开始执行任务:task3 completable-future-test-Thread-2开始执行任务:task1 completable-future-test-Thread-3开始执行任务:task2 正在执行任务task3 task3执行结束 正在执行任务task2 正在执行任务task1 task2执行结束 task1执行结束 复制代码
一看执行结果,明显是乱序的,并且三个任务分别由三个线程执行,符合咱们的预期;注意异步的方法后面都是带有Async
关键字的;
多任务结果合并计算
- 两个任务结果的合并
任务3的执行依赖于任务1、任务2的返回值,并且任务1和任务3由同一个线程执行,任务2单独一个线程执行;
示例代码如下:
CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenCombine( CompletableFuture // 任务2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR), // 任务3 (task1Result, task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }); 复制代码
执行结果如下:
completable-future-test-Thread-3开始执行任务:task2 completable-future-test-Thread-2开始执行任务:task1 正在执行任务task1 正在执行任务task2 task2执行结束 task1执行结束 completable-future-test-Thread-2开始执行任务:task3 task1结果:task1 task2结果:task2 正在执行任务task3 task3执行结束 复制代码
CompletableFuture
提供了thenCombine()
来合并另一个CompletableFuture
的执行结果,所以thenCombine()
需要两个参数,第一个参数是另一个CompletableFuture
,第二个参数会收集前两个任务的返回值,类似下面这样:
(result1,result2)->{ // 执行业务逻辑 return result3; }
如果小伙伴们想要实现任务3也是单独的线程执行的话,可以使用thenCombineAsync()
这个方法。代码如下:
CompletableFuture // 任务1 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task1"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return taskName; }, THREAD_POOL_EXECUTOR) .thenCombineAsync( CompletableFuture // 任务2 .supplyAsync( () -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task2"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return 2; }, THREAD_POOL_EXECUTOR), // 任务3 (task1Result, task2Result) -> { Thread currentThread = Thread.currentThread(); String ThreadName = currentThread.getName(); String taskName = "task3"; System.out.println(ThreadName + "开始执行任务:" + taskName); System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result); System.out.println("正在执行任务" + taskName); System.out.println(taskName + "执行结束"); return 2L; }, THREAD_POOL_EXECUTOR); 复制代码
如果任务3中不需要返回结果,可以使用thenAcceptBoth()
或thenAcceptBothAsync()
,使用方式与thenCombineAsync()
类似;