CompletableFuture和CompletionService可以让我们更好的编写并发代码,更专注于编写我们自己的业务逻辑,今天让我们一起来学习一下吧
一.CompletionService
为什么需要completionService呢,假如有这样一个需求,并发请求3个接口,得到某一个接口的结果后,就可以执行下一步业务,不需要等待另外两个请求的返回。
方案一:使用Future+线程池 + 阻塞队列实现
需要把Future.get通过线程池放到LinkedBlockingQueue阻塞队列里,还是有点复杂的。
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); for(int i=0;i<3;i++) { int finalI = i; Future<String> future = executorService.submit(() -> { //請求接口1 try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } return "接口"+ finalI; }); executorService.execute(()-> { try { blockingQueue.put(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } //优先获取先返回的結果,做后续逻辑 System.out.println(blockingQueue.take());
方案二:通过completionService实现
使用completionService,可以降低代码的复杂度,使得批量执行任务管理更加简单,底层其实也是线程池+LinkedBlockingQueue实现
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService<String> completionService = new ExecutorCompletionService(executorService); for (int i = 0; i < 3; i++) { int finalI = i; completionService.submit(() -> { //請求接口1 try { Thread.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } return "接口" + finalI; }); } //优先获取先返回的結果,做后续逻辑 try { System.out.println(completionService.take().get()); } catch (ExecutionException e) { e.printStackTrace(); } }
怎么创建completionService,通过ExecutorCompletionService创建
//传入线程池,不传入队列,默认使用LinkedBlockingQueue无界队列,存放future返回的结果 ExecutorCompletionService(Executor executor); //传入线程池+自定义队列 ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。
怎么使用completionService,通过submit方法提交任务,通过take或者poll方法获取结果
Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
二.CompletableFuture
有这么一个需求,泡茶例子:有三个线程,线程1负责烧开水,线程2洗茶具,线程3负责泡茶(需要等待线程1和线程2完成),下面代码是用CompletableFuture实现,是不是比较简单,基本只需要写业务代码就可以了,他们怎么合作协调的完全不用关。
//任务1:烧开水 CompletableFuture<Boolean> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("T1:烧开水..." + ":" + Thread.currentThread().getName()); sleep(15000); return true; }); //任务2:洗茶具 CompletableFuture<Boolean> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2:洗茶具..." + ":" + Thread.currentThread().getName()); sleep(1000); return true; }); //任务3:任务1和任务2完成后执行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2, (t1, t2) -> { if (t1 && t2) { System.out.println("T3:泡茶..." + ":" + Thread.currentThread().getName()); } return "上茶"; }); //等待任务3执行结果 System.out.println(f3.join());
让我们一起来学习一下怎么使用CompletableFuture吧
怎么创建CompletableFuture对象
分成两个方法run和supply,run方法没有返回值,supply方法可以通过get获取返回值,不传入线程池使用默认线程池ForkJoinPool,所有CompletableFuture都会公用同一个线程池,避免有些任务是比较慢的操作,影响到其他任务,所以建议不同的业务使用不同的线程池。
//使用默认线程池 static CompletableFuture<Void> runAsync(Runnable runnable) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //可以指定线程池 static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture的任务组合功能
CompletableFuture实现了CompletionStage接口,提供了组合任务的功能,可以讲多个任务串行化执行,多个任务结果聚合等功能
1. 描述串行关系
带Async的方法会异步执行fn,action,consumer这些函数,相反不带Async则是同步执行
Apply:可以有入参和返回值 ;Accept可以支持入参;Run:不支持入参和返回值
CompletionStage thenApply(fn); CompletionStage thenApplyAsync(fn); CompletionStage thenAccept(consumer); CompletionStage thenAcceptAsync(consumer); CompletionStage thenRun(action); CompletionStage thenRunAsync(action); CompletionStage thenCompose(fn); CompletionStage thenComposeAsync(fn);
举个例子:下面的代码是先返回我是java,再拼接上小面,最后转成大小字母返回。
CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "我是java") //① .thenApply(s -> s + " 小面") //② .thenApply(String::toUpperCase);//③ System.out.println(f0.join());
2. 描述 AND 汇聚关系
同样的方法带Async的是异步执行,否则是同步执行
thenCombine:得到两个任务的结果,作为fn函数的输入值,有返回值
AcceptBoth:得到两个任务的结果,作为consumer函数的输入值,没有返回值
runAfterBoth:得到两个任务的结果,没有入参和返回值
CompletionStage<R> thenCombine(other, fn); CompletionStage<R> thenCombineAsync(other, fn); CompletionStage<Void> thenAcceptBoth(other, consumer); CompletionStage<Void> thenAcceptBothAsync(other, consumer); CompletionStage<Void> runAfterBoth(other, action); CompletionStage<Void> runAfterBothAsync(other, action);
例子可以看上面泡茶的例子使用了thenCombine
3. 描述 OR 汇聚关系
同样的方法带Async的是异步执行,否则是同步执行
applyToEither:得到任意一个任务的结果,作为fn函数的输入值,有返回值
acceptEither:得到任意一个任务的结果,作为consumer函数的输入值,没有返回值
runAfterEither:得到任意一个任务的结果,没有入参和返回值
CompletionStage applyToEither(other, fn); CompletionStage applyToEitherAsync(other, fn); CompletionStage acceptEither(other, consumer); CompletionStage acceptEitherAsync(other, consumer); CompletionStage runAfterEither(other, action); CompletionStage runAfterEitherAsync(other, action);
下面例子演示了怎么使用applyToEither
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{ int t = new Random().nextInt(10); sleep(t); return "小面1摸鱼了:"+ String.valueOf(t) + "ms"; }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ int t = new Random().nextInt(10); sleep(t); return "小面2摸鱼了:"+ String.valueOf(t) + "ms"; }); CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s); System.out.println(f3.join());
4.异常处理
当我们使用CompletableFuture编写异步代码时,出现运行时错误时,我们怎么知道呢?
CompletionStage接口也是很好的支持了异常处理的功能
exceptionally:出现异常就会调用
whenComplete和handle:类似于try finally 的finally方法,无论是否报错都会执行
CompletionStage exceptionally(fn); CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn);
下面的例子:supplyAsync发生了一次,exceptionally就会执行,最后执行whenComplete方法
CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> (7 / 0)) .thenApply(r -> r * 10) .exceptionally(e -> 0) .whenComplete((a, t) -> System.out.println(a)); System.out.println(f0.join());
三.小结
我们来总结一下,我们学习了CompletionService 和CompletableFuture的创建和常用的方法,以及在并发编程中的运用,你学会了吗?