大纲
1.Runnable接口与Callable接口
(1)Runnable接口实现异步任务
(2)Callable接口实现异步任务
2.Future模式
(1)Future模式的概念
(2)Future接口的使用
(3)FutureTask类的使用
3.CompletableFuture的使用和异步编程
(1)使用Future时的问题
(2)CompletableFuture的使用例子
(3)CompletableFuture的使用场景
(4)CompletableFuture的创建异步任务
(5)CompletableFuture的简单任务异步回调
(6)CompletableFuture的多个任务组合处理
(7)CompletableFuture的使用注意事项
1.Runnable接口与Callable接口
(1)Runnable接口实现异步任务
(2)Callable接口实现异步任务
(1)Runnable接口实现异步任务
也就是通过创建实现了Runnable接口的Thread线程来实现异步任务。
Runnable接口实现的异步任务存在的问题:
一.Runnable接口不支持获取返回值
二.Runnable接口不支持抛出异常
@FunctionalInterface public interface Runnable { public abstract void run(); } public class Thread implements Runnable { ... private Runnable target; public Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); } @Override public void run() { if (target != null) { target.run(); } } ... } @RunWith(SpringRunner.class) @SpringBootTest public class Test { @Test public void testNewThread() { Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("实现的异步任务"); } }); t1.start(); } }
(2)Callable接口实现异步任务
Callable接口需要与Future和ExecutorService结合使用:通过ExecutorService的submit()方法提交一个实现Callable接口的任务,然后ExecutorService的submit()方法会返回一个实现Future接口的对象,接着调用Future接口的get()方法就可以获取异步任务的结果。
public interface ExecutorService extends Executor { ... //Submits a value-returning task for execution and returns a Future representing the pending results of the task. //The Future's get method will return the task's result upon successful completion. //@param task the task to submit //@param <T> the type of the task's result //@return a Future representing pending completion of the task <T> Future<T> submit(Callable<T> task); ... } public interface Future<V> { ... //Waits if necessary for the computation to complete, and then retrieves its result. V get() throws InterruptedException, ExecutionException; //Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available. V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; ... }
2.Future模式
(1)Future模式的概念
(2)Future接口的使用
(3)FutureTask类的使用
(1)Future模式的概念
当前线程有一个任务,提交给了Future,由Future来完成这个任务,在此期间当前线程可以处理其他事情了。一段时间后,当前线程就可以从Future中获取结果。
(2)Future接口的使用
一.Future接口源码
二.普通模式计算1000次1到1亿的和
三.Future模式计算1000次1到1亿的和
一.Future接口源码
Future就是对实现Runnable或Callable接口的任务进行查询、中断、获取。
public interface Future<V> { //用来取消任务,取消成功则返回true,取消失败则返回false //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务 //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true boolean cancel(boolean mayInterruptIfRunning); //表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true boolean isCancelled(); //表示任务是否已经完成,若任务完成,则返回true boolean isDone(); //获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果 V get() throws InterruptedException, ExecutionException; //获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
二.普通模式计算1000次1到1亿的和
public class NormalTest { //普通模式计算1000次1到1亿的和 public static void main(String[] args) { long start = System.currentTimeMillis(); List<Integer> retList = new ArrayList<>(); //计算1000次1至1亿的和 for(int i = 0; i < 1000; i++) { retList.add(Calc.cal(100000000)); } System.out.println("耗时: " + (System.currentTimeMillis() - start)); for (int i = 0; i < 1000; i++) { try { Integer result = retList.get(i); System.out.println("第" + i + "个结果: " + result); } catch (Exception e) { } } System.out.println("耗时: " + (System.currentTimeMillis() - start)); } public static class Calc implements Callable<Integer> { @Override public Integer call() throws Exception { return cal(10000); } public static int cal (int num) { int sum = 0; for (int i = 0; i < num; i++) { sum += i; } return sum; } } } -------------------------------------------------- 执行结果: 耗时: 43659 第0个结果: 887459712 第1个结果: 887459712 第2个结果: 887459712 ... 第999个结果: 887459712 耗时: 43688
三.Future模式计算1000次1到1亿的和
public class FutureTest { //Future模式计算1000次1到1亿的和 public static void main(String[] args) { long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<Integer>> futureList = new ArrayList<>(); //计算1000次1至1亿的和 for (int i = 0; i < 1000; i++) { //调度执行 futureList.add(executorService.submit(new Calc())); } System.out.println("耗时: " + (System.currentTimeMillis() - start)); for (int i = 0; i < 1000; i++) { try { Integer result = futureList.get(i).get(); System.out.println("第" + i + "个结果: " + result); } catch (InterruptedException | ExecutionException e) { } } System.out.println("耗时: " + (System.currentTimeMillis() - start)); } public static class Calc implements Callable<Integer> { @Override public Integer call() throws Exception { return cal(100000000); } public static int cal (int num) { int sum = 0; for (int i = 0; i < num; i++) { sum += i; } return sum; } } } -------------------------------------------------- 执行结果: 耗时: 12058 第0个结果: 887459712 第1个结果: 887459712 ... 第999个结果: 887459712 耗时: 12405
(3)FutureTask类的使用
一.FutureTask类的简介
二.Callable + Future获取异步任务的执行结果
三.Callable + FutureTask获取异步任务的结果
一.FutureTask类的简介
FutureTask类实现了RunnableFuture接口,而RunnableFuture接口又继承了Runnable接口和Future接口。所以FutureTask类既可以作为Runnable被线程执行,又可以作为Future得到Callable的run()方法的返回值。同时,FutureTask类是Future接口的唯一实现类。
public class FutureTask<V> implements RunnableFuture<V> { ... ... } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
二.Callable + Future获取异步任务的执行结果
//Callable+Future获取执行结果 public class FutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果" + result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } } class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for (int i = 0; i < 100; i++) { sum += i; } return sum; } }
三.Callable + FutureTask获取异步任务的结果
//Callable + FutureTask获取执行结果 public class FutureTest { public static void main(String[] args) { //第一种方式 ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<>(task); executor.submit(futureTask); executor.shutdown(); //第二种方式 //注意这种方式和第一种方式效果是类似的,只不过之前使用的是ExecutorService,现在使用的是Thread //Task task = new Task(); //FutureTask<Integer> futureTask = new FutureTask<Integer>(task); //Thread thread = new Thread(futureTask); //thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果" + futureTask.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
3.CompletableFuture的使用和异步编程
(1)使用Future时的问题
(2)CompletableFuture的使用例子
(3)CompletableFuture的使用场景
(4)CompletableFuture的创建异步任务
(5)CompletableFuture的简单任务异步回调
(6)CompletableFuture的多个任务组合处理
(7)CompletableFuture的使用注意事项
(1)使用Future时的问题
一.通过Future获取结果演示
如果主线程需要执行一个很耗时的计算任务,那么可通过Future把该任务放到异步线程执行,让主线程继续处理其他任务。当这个耗时的任务处理完成后,再让主线程通过Future获取计算结果。
如下所示,有两个服务:
public class UserInfoService { public UserInfo getUserInfo(Long userId) throws InterruptedException { Thread.sleep(300);//模拟调用耗时 return new UserInfo("...");//一般是查数据库,或者远程调用返回 } } public class OrderService { public OrderInfo getOrderInfo(long userId) throws InterruptedException { Thread.sleep(500);//模拟调用耗时 return new OrderInfo("..."); } }
接下来,演示在主线程中是如何使用Future来进行异步调用的。
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); UserInfoService userInfoService = new UserInfoService(); OrderService orderService = new OrderService(); long userId = 1L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() { @Override public UserInfo call() throws Exception { return userInfoService.getUserInfo(userId); } }); //提交任务给线程池异步执行 executorService.submit(userInfoFutureTask); //模拟主线程其它操作耗时 Thread.sleep(300); //调用订单服务获取用户订单信息 FutureTask<OrderInfo> orderInfoFutureTask = new FutureTask<>(new Callable<OrderInfo>() { @Override public MedalInfo call() throws Exception { return medalService.getMedalInfo(userId); } }); //提交任务给线程池异步执行 executorService.submit(medalInfoFutureTask); UserInfo userInfo = userInfoFutureTask.get();//获取用户信息结果 OrderInfo orderInfo = orderInfoFutureTask.get();//获取订单信息结果 System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } } //总共用时806ms
如果不使用Future进行并行异步调用,而是在主线程串行执行,那么耗时大约为300 + 500 + 300 = 1100ms。
二.Future获取结果时存在的问题
可见,Future + 线程池异步配合,提高了程序的执行效率。但是由于根据Future获取结果的方式不是很友好,所以只能通过阻塞或者轮询的方式来得到任务的结果。
方式一:
通过Future提供的get()方法,进行阻塞调用。在主线程获取到异步任务的执行结果前,get()方法会一直阻塞。
方式二:
通过Future提供的isDone()方法,进行轮询调用。可以让主线程在程序中轮询isDone()方法来查询异步任务的执行结果。
阻塞的方式会违背异步编程的理念,轮询的方式又会空耗CPU资源,因此JDK8设计出了CompletableFuture。
CompletableFuture提供了一种类似观察者模式的机制,可以让异步任务执行完成后通知主线程。
三.使用Future的问题总结
首先需要单独创建一个线程池来提交Callable任务。然后如果使用Future的get()方法获取结果,那么需要进行阻塞调用。如果使用Future的isDone()方法获取结果,那么需要进行轮询调用。
(2)CompletableFuture的使用例子
如下所示,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync()方法,提供了异步执行的功能,线程池也不用单独创建了,实际上使用的默认线程池是ForkJoinPool.commonPool。
public class CompletableFutureTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { UserInfoService userInfoService = new UserInfoService(); OrderService orderService = new OrderService(); long userId = 1L; long startTime = System.currentTimeMillis(); //调用用户服务获取用户基本信息 CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId)); //模拟主线程其它操作耗时 Thread.sleep(300); //调用订单服务获取用户订单信息 CompletableFuture<OrderInfo> completableOrderInfoFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(userId)); //获取个人信息结果 UserInfo userInfo = completableUserInfoFuture.get(); //获取订单信息结果 OrderInfo orderInfo = completableOrderInfoFuture.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
(3)CompletableFuture的使用场景
一.创建异步任务
二.简单任务异步回调
三.多个任务组合处理
(4)CompletableFuture的创建异步任务
CompletableFuture创建异步任务的方法:supplyAsync()和runAsync();
一.supplyAsync()方法执行CompletableFuture任务,有返回值。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool() //with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param <U> the function's return type //@return the new CompletableFuture //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor //with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletableFuture //使用自定义的线程池,根据supplier构建执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } ... }
二.runAsync()方法执行CompletableFuture任务,没有返回值。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool() //after it runs the given action. //@param runnable the action to run before completing the returned CompletableFuture //@return the new CompletableFuture //使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor //after it runs the given action. //@param runnable the action to run before completing the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@return the new CompletableFuture //使用自定义的线程池,根据runnable构建执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } ... }
使用示例如下:
public class CompletableFuture { public static void main(String[] args) { //自定义线程池 ExecutorService executor = Executors.newCachedThreadPool(); //runAsync的使用,没有返回值 CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("没有返回值"), executor); //supplyAsync的使用,有返回值 CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.print("有返回值"); return "OK"; }, executor); //runAsync的future没有返回值,输出null System.out.println(runFuture.join()); //supplyAsync的future,有返回值 System.out.println(supplyFuture.join()); //关闭线程池 executor.shutdown(); } }
(5)CompletableFuture的简单任务异步回调
一.thenRun()和thenRunAsync()方法
CompletableFuture的thenRun()方法就是:执行完第一个任务后,再执行第二个任务。也就是当某个任务执行完成后,会执行设置给该任务的回调方法。但是前后两个任务没有传递参数,第二个任务也没有返回值。
thenRun()和thenRunAsync()方法的区别是:如果执行第一个任务时,传入了一个自定义线程池。当调用thenRun()方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。当调用thenRunAsync()方法执行第二个任务时,则第一个任务使用传入的线程池,第二个任务使用ForkJoin线程池。也就是说,thenRunAsync()会使用ForkJoin线程池来异步执行任务。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ... public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } ... } public class FutureThenRunTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("先执行第一个任务"); return "第一个任务执行完成"; }); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("接着执行第二个任务"); }); System.out.println("输出" + thenRunFuture.get()); } } //执行程序输出的结果如下: //先执行第一个任务 //接着执行第二个任务 //输出null
二.thenAccept()和thenAcceptAsync()方法
CompletableFuture的thenAccept()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,但是第二个任务是没有返回值的。
CompletableFuture的thenAcceptAsync()方法会使用ForkJoin线程池来异步执行任务。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ... public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } ... } public class FutureThenAcceptTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("执行第一个任务"); return "第一个任务的返回值"; }); CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> { System.out.println("执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); } }); System.out.println("输出" + thenAcceptFuture.get()); } } //执行程序输出的结果如下: //执行第一个任务 //执行第二个任务 //收到传入的第一个任务的返回值 //输出null
三.thenApply()和thenApplyAsync()方法
CompletableFuture的thenApply()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,并且第二个任务是有返回值的。
CompletableFuture的thenApplyAsync()方法会使用ForkJoin线程池来异步执行任务。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ... public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } ... } public class FutureThenApplyTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("执行第一个任务"); return "第一个任务的返回值"; }); CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> { if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); System.out.println("执行第二个任务"); return "第二个任务的返回值"; } return "第二个任务的返回值"; }); System.out.println("输出" + thenApplyFuture.get()); } } //执行程序输出的结果如下: //执行第一个任务 //收到传入的第一个任务的返回值 //执行第二个任务 //输出第二个任务的返回值
四.exceptionally()方法
CompletableFuture的exceptionally()方法表示:某个任务执行异常时,才执行的回调方法。并且将抛出的异常作为参数,传递到回调方法中。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); } ... } public class FutureExceptionTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("异步执行任务时抛出异常"); throw new RuntimeException(); }); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "返回处理异步执行任务抛出的异常的结果"; }); System.out.println(exceptionFuture.get()); } } //执行程序输出的结果如下: //异步执行任务时抛出异常 //java.util.concurrent.CompletionException: java.lang.RuntimeException //返回处理异步执行任务抛出的异常的结果
五.whenComplete()和whenCompleteAsync()
CompletableFuture的whenComplete()方法表示:某个任务执行完成后,紧接着执行的回调方法无返回值。whenComplete()方法返回的CompletableFuture的result是上个任务的结果。
CompletableFuture的whenCompleteAsync()方法会使用ForkJoin线程池来异步执行回调方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ... public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } ... } public class FutureWhenTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("异步执行第一个任务"); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值"; }); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("异步执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); } System.out.println("whenComplete()执行的回调方法没有返回值"); }); System.out.println("输出" + rstFuture.get()); } } //执行程序输出的结果如下: //异步执行第一个任务 //异步执行第二个任务 //收到传入的第一个任务的返回值 //whenComplete()执行的回调方法没有返回值 //输出第一个任务的返回值
六.handle()和handleAsync()方法
CompletableFuture的handle()方法表示:异步任务执行完成后,紧接着执行的回调方法是有返回值的。handle()方法返回的CompletableFuture的result是回调方法执行的结果。
CompletableFuture的handleAsync()方法会使用ForkJoin线程池来异步执行回调方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ... public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn); } public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } ... } public class FutureHandlerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println(""); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } return "第一个任务的返回值"; }); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("执行第二个任务"); if ("第一个任务的返回值".equals(a)) { System.out.println("收到传入的第一个任务的返回值"); return "第二个任务的返回值"; } return "第二个任务的返回值"; }); System.out.println("输出" + rstFuture.get()); } } //执行程序输出的结果如下: //执行第二个任务 //收到传入的第一个任务的返回值 //输出第二个任务的返回值
(6)CompletableFuture的多个任务组合处理
一.AND组合关系
thenCombine()、thenAcceptBoth()、runAfterBoth()都表示:将两个CompletableFuture任务组合起来,只有这两个任务都正常执行完后,才会执行后面的回调方法。区别如下:
thenCombine()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。
thenAcceptBoth()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,但指定的回调方法无返回值。
runAfterBoth()方法则不会把两个任务的执行结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。
public class ThenCombineTest { public static void main(String[] args) throws Exception { CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> { System.out.println("第一个异步任务要执行3秒"); try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一个异步任务执行完毕"); return "第一个任务的返回值"; }); ExecutorService executor = Executors.newFixedThreadPool(2); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("第二个异步任务要执行2秒"); try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二个异步任务执行完毕"); return "第二个任务的返回值"; }, executor).thenCombineAsync(firstFuture, (secondResult, firstResult) -> { System.out.println("两个异步任务都执行完毕后才能执行这里"); System.out.println("接收到" + firstResult); System.out.println("接收到" + secondResult); return "两个异步任务都执行完后执行的回调的返回值"; }, executor); System.out.println("输出" + future.join()); executor.shutdown(); } } //执行程序输出的结果如下: //第一个异步任务要执行3秒 //第二个异步任务要执行2秒 //第二个异步任务执行完毕 //第一个异步任务执行完毕 //两个异步任务都执行完毕后才能执行这里 //接收到第一个任务的返回值 //接收到第二个任务的返回值 //输出两个异步任务都执行完后执行的回调的返回值
二.OR组合关系
applyToEither()、acceptEither()、runAfterEither()都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。区别如下:
applyToEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。
acceptEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法无返回值。
runAfterEither()方法不会把已经执行完成的任务的结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。
public class AcceptEitherTest { public static void main(String[] args) { //第一个异步任务,休眠2秒,保证它执行晚点 CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(2000L); System.out.println("执行完第一个任务"); } catch (Exception e) { return "执行第一个任务异常"; } return "返回第一个任务的结果"; }); ExecutorService executor = Executors.newSingleThreadExecutor(); //第二个异步任务 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("执行完第二个任务"); return "返回第二个任务的结果"; }, executor).acceptEitherAsync(first, (lastResult) -> { System.out.println("执行完第一个或第二个任务后的回调"); System.out.println("获取到传入的先执行完的任务的返回结果是:" + lastResult); }, executor); executor.shutdown(); } } //执行程序输出的结果如下: //执行完第二个任务 //执行完第一个或第二个任务后的回调 //获取到传入的先执行完的任务的返回结果是:返回第二个任务的结果
三.anyOf
任意一个任务执行完,就执行anyOf()方法返回的CompletableFuture。如果执行的任务异常,anyOf()方法返回的CompletableFuture在执行get()方法时,会抛出异常。
public class AnyOfFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务A执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("任务B执行完了"); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m, k) -> { System.out.println("finish"); }); anyOfFuture.join(); } } //执行程序输出的结果如下: //任务B执行完了 //finish
四.allOf
所有任务都执行完成后,才执行allOf()方法返回的CompletableFuture。如果任意一个任务异常,allOf()方法返回的CompletableFuture在执行get()方法时,会抛出异常。
public class allOfFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(() -> { System.out.println("任务A执行完了"); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("任务B执行完了"); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m, k) -> { System.out.println("finish:" + m + "," + k); }); } } //执行程序输出的结果如下: //任务A执行完了 //任务B执行完了 //finish: null,null
五.thenCompose
thenCompose()方法会在某个任务执行完成后,将该任务的执行结果作为方法入参去执行指定的方法。thenCompose()方法会返回一个新的CompletableFuture实例。
public class ThenComposeTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务"); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("执行第二个任务"); return "返回第二个任务的结果"; }, executor).thenComposeAsync(data -> { System.out.println("执行第三个任务"); System.out.println("收到传入的:" + data); return f; }, executor); System.out.println(future.join()); executor.shutdown(); } } //执行程序输出的结果如下: //执行第二个任务 //执行第三个任务 //收到传入的:返回第二个任务的结果 //第一个任务
(7)CompletableFuture的使用注意事项
一.Future需要获取返回值,才能获取异常信息
public class ThenComposeTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = newThreadPoolExecutor( 5, 10, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10) ); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int a = 0; int b = 666; int c = b / a; return true; },executorService).thenAccept((a) -> { System.out.println(a); }); //如果如下这一行get()方法,是看不到异常信息的 //future.get(); } }
二.CompletableFuture的get()方法是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
三.默认线程池的注意点
CompletableFuture代码中使用了默认的线程池,处理的线程个数是机器CPU核数 - 1。在大量请求过来时,如果处理逻辑复杂,那么响应就会很慢。所以一般建议使用自定义线程池,优化线程池配置参数。
四.自定义线程池时注意饱和策略
由于CompletableFuture的get()方法是阻塞的,所以一般建议使用类似future.get(3, TimeUnit.SECONDS),并且一般建议使用自定义线程池。
但如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,那么当线程池饱和时,会直接丢弃任务,不会抛出异常。
因此建议CompletableFuture线程池的拒绝策略最好使用AbortPolicy,然后对耗时的异步线程做好线程池隔离。