多任务并行协作
任务串行执行
结果组合运算
thenCombine和thenCompose
thenAcceptBoth和runAfterBoth
acceptEither、runAfterEither
java9的改进
总结
CompletableFuture是java8引入的一个异步类,它最大的优势是可以在创建的对象中传入一个回调对象,在任务结束后(done或throw exception),自动调用回调对象的回调方法,而不用让主线程阻塞。
多任务并行协作
假如我们要做咖啡,有3个子任务可以并行执行:洗杯子、磨咖啡、烧水,这3步完成后,我们开始泡咖啡。这种需求我们一般怎么实现呢?
下面我们看一下,使用Future是怎么完成这个功能的。
先定义一个线程池:
public class MyThreadPoolExecutor { public static ExecutorService getThreadPoolExecutor(){ return ThreadPoolExecutorFactory.THREAD_POOL; } private static class ThreadPoolExecutorFactory{ private static int PROCESSOR_NUM = Runtime.getRuntime().availableProcessors(); private static final ExecutorService THREAD_POOL = new ThreadPoolExecutor(PROCESSOR_NUM, PROCESSOR_NUM + 1, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100)); }
我们再来看看使用Future制作咖啡的过程,我们把每个步骤都用一个线程来执行,必须等待前面三步都执行完成后,我们才能开始泡咖啡
public class MakeTea { public static void main(String[] args){ ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); List<Future> list = new ArrayList<>(3); list.add(executor.submit(() -> washCup())); list.add(executor.submit(() -> hotWater())); list.add(executor.submit(() -> grindCoffee())); while (true){ int i = 0; for (Future future : list){ if (future.isDone()){ i ++; } } if (i == list.size()){ break; } } executor.submit(() -> System.out.println("泡咖啡")); System.out.println("我是主线程"); } private static String washCup(){ System.out.println("洗杯子"); return "洗杯子"; } private static String hotWater(){ System.out.println("烧水"); return "烧水"; } private static String grindCoffee(){ System.out.println("磨咖啡"); return "磨咖啡"; } }
上面的代码创建3个线程后,因为等待执行结果,阻塞了主线程,等3个步骤都执行完成,主线程才能执行。输出如下:
洗杯子 烧水 磨咖啡 我是主线程 泡咖啡
如果我们使用CompletableFuture来写,要怎么实现呢?代码如下:
public static void main(String[] args){ ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor(); CompletableFuture future1 = CompletableFuture.runAsync(() -> { try { washCup(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future2 = CompletableFuture.runAsync(() -> { try { hotWater(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future3 = CompletableFuture.runAsync(() -> { try { grindCoffee(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture.allOf(future1, future2, future3).thenAccept( r -> { System.out.println("泡咖啡"); } ); System.out.println("我是主线程"); }
上面代码输出结果如下,可以看到主线程并没有被阻塞
我是主线程 洗杯子 烧水 磨咖啡 泡咖啡
上面的示例是多个任务之间的调度,最后一个任务必须等之前的3个任务都完成后(allOf),才能执行。如果前面3个任务只有一个完成最后一个任务就可以执行,那就用anyOf方法,把上面代码中allOf改成anyOf,其他代码不变,执行结果如下:
我是主线程 洗杯子 泡咖啡 烧水 磨咖啡
注意:
1.anyOf方法返回的是Object对象而不是Void,这是跟allOf的一个很大的区别,我们要配置异常情况的回调对象,在allOf创建的CompletableFuture中是不可以的。看下面代码
CompletableFuture<Object> future4 = CompletableFuture.anyOf(future1, future2, future3); future4.thenApply( r -> { System.out.println("泡咖啡"); return null; } ); future4.exceptionally(e -> { e.printStackTrace(); return null; });
2.supplyAsync和runAsync区别是前者创建的是CompletableFuture<U>,后者创建的是CompletableFuture<Void>