前言
前面我们讲了JUC里面线程池的相关内容,我们在代码中把任务交给线程池,实际上就是一种线程异步的操作,这能提升性能,减少接口相应时间。但是大部分情况下,我们还会在适当的时间后,看看任务有没有完成,又或者从异步线程里取回结果来进行后续操作,那么就用到了本期的内容了
一、Future的原始意义
要想搞清楚异步任务,我们必须先学一点基础,比如Future,回想我们第一次认识Future,大部分都是在使用线程池时,向线程池提交任务,看到返回值为Future类型
我们点开Future,看看它的方法
不难发现,Future就是一张凭证,我们交给线程池一个任务,线程池返回给我们一个凭证。 我们可以通过这个凭证去
- 咨询任务的执行情况
- 取消任务
- 取出任务结果
一个贴近现实的例子就是,我们去寄快递,店家会给返回给我们一个快递单号,我们后续可以通过这快递单号咨询快递到哪了,或者取走快递也需要使用这快递单号,这个给我们的快递单号就是Future
二、FutureTask 和 CompletableFuture
前面我们说了,Future的原始意义就是一个异步任务的“凭证”,可以通过凭证查询。但Future是一个接口,JUC里Future的实现类经常兼并着其他作用,所以我们对Future的实现类进行讨论时,往往会发现,这些实现类的功能已经超出了上述的“凭证”的范畴,我们现在要讨论的就是它的两个增强实现类 FutureTask 和 CompletableFuture
1.FutureTask
Future最常用的实现类就是FutureTask,顾名思义,FutureTask 是由Future 和 Task 组成,task即一个可执行的任务。我们可以看一下类图
FutureTask 同时实现了Future 和 Runnable。说白了,这个类就是个粘合剂,既可以被执行,又能存储任务的执行结果,同时还可以取消任务、查看任务的状态、获取执行结果。
即FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取执行的返回结果,等于说身兼两职,以一个对象,解决了future只能观测异步结果,runnable只能执行的问题
我们可以写个例子:
public static void main(String[] args) throws Exception { // 创建线程池 ThreadPoolExecutor tp = new ThreadPoolExecutor(3, 5, 2000L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), new ThreadFactory() { private AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r, "money caculate thread: " + threadNumber.getAndAdd(1)); } }, new ThreadPoolExecutor.CallerRunsPolicy() ); // 创建futureTask FutureTask<String> futureTask = new FutureTask(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return "返回任务结果"; } }); // 提交任务 Future<?> submit = tp.submit(futureTask); String s = futureTask.get(); System.out.println("通过futureTask拿结果: " + s); String o = (String)submit.get(); System.out.println("通过线程池凭证拿结果: " + o); }
结果如下:
当然,如果我们不手动建FutureTask,而是直接向线程池提交一个任务。其内部实际上也是会为我们自动建一个FutureTask,然后返回给我们的,这也说明了FutureTask其实是相当基础的一个类。
Future future = tp.submit(new Runnable()…)
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
2.CompletableFuture
学完了FutureTask,我们不难得出,对于像提交任务,检查任务状态,获取任务结果这些场景,FutureTask已经做到了。但这并不意味着,他没有问题,我们来看一个实际场景:
某一个接口耗时太长,需要优化。经过分析,发现该接口内有大量长耗时的任务,虽然我们必须等待这些任务都完成才算结束,但这些接口本身没有关联。因此,我们考虑使用线程池,并发的执行这些任务。
但大量的FutureTask不知道谁先执行完,主线程最终还是要线性的问询这些任务的执行结果,效率未最大化
且手动一个个的进行future.isDone 或 future.get判断,导致代码十分臃肿。
上述场景就暴露了FutureTask的不足,其实不仅于此,Future 的问题在于
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的(尽管可以设置阻塞时间)。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
为了解决上述问题,Future接口的另一个“更强功能”的实现类CompletableFuture也就应运而生,照例,我们先看类图
可以看到与FutureTask相比,CompletableFuture没有实现Runnable,但实现了CompletionStage,这个CompletionStage是JDK8引入的内容,可以对异步执行的任务分阶段,并建立任务间的依赖关系,我们看下它定义的方法:
别被这么长的方法吓到,其实这些方法都是用来建立任务执行顺序,建立任务依赖关系的。本次我们主要说的还是偏使用层面,观察这些方法的名字以及出入参,我们不难看出规律,以方法名为例,名字中带有以下关键字的
- apply:上阶段结果作下阶段参数继续执行,最后有返回结果
- accept:上阶段结果作下阶段参数继续执行,最后无返回结果
- run:上阶段执行完,下阶段执行,彼此不存在参数上的依赖
当然,我们也可以从另一个角度看,方法名带有下列关键字的:
- both:前两阶段同时执行完毕,然后执行下一阶段
- either:前两阶段任一执行完毕,执行下一阶段
出于篇幅考虑,我们在这里并不会一一解释这些方法,关于CompletableFuture的详细分析,可以参考另一篇博客:CompletableFuture使用详解
我们现在主要解决我们在前面提出的那个现实场景:
我们希望大量无关联任务并发执行,当所有任务完成的时候,我们能最快知晓,从而减少接口耗时。
这里我们就可以利用到CompletableFuture 里面的supplyAsync异步提交任务,然后使用 allOf 方法组合所有任务
具体的代码样式如下:
// 分别向线程池提交任务 CompletableFuture<T> future1 = CompletableFuture.supplyAsync(() -> a.method(), tp); CompletableFuture<T> future2 = CompletableFuture.supplyAsync(() -> b.method(), tp); CompletableFuture<T> future3 = CompletableFuture.supplyAsync(() -> c.method(), tp); CompletableFuture<Void> mixedFuture = CompletableFuture.allOf(future1, future2, future3); // 所有任务都完成 if (mixedFuture.isDone()) { // some finish job }
如果你够细心,应该能发现两个supplyAsync方法,一个需要传线程池,另一个不需要。不需要是因为CompletableFuture内会创建一个ForkJoinPool.commonPool()线程池,ForkJoinPool线程池是一个独特的线程池,我们在JUC基础(一)—— 线程池里提到过,但没有仔细分析其功能和实现,这块内容也会在后续推出详解。