Future和Callable接口
- Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)
- 一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
- 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。
- 有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
FutureTask实现类
- 在源码可以看到,他既继承了
RunnableFuture
接口,也在构造方法中实现了Callable
接口(有返回值、可抛出异常)和Runnable
接口
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
完成上面目的的代码 - 多线程/有返回/异步
一个主线程,一个mythread|步执行了|返回了"hello callable"
public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(new MyThread()); Thread t1 = new Thread(futureTask,"t1"); t1.start(); System.out.println(futureTask.get());//接收返回值 } } class MyThread implements Callable<String>{ @Override public String call() throws Exception { System.out.println("-----come in call() ----异步执行"); return "hello Callable 返回值"; } } //结果 //-----come in call() ----异步执行 //hello Callable 返回值
Future到CompletableFuture
Future优点
- future+线程池异步多线程任务配合,能显著提高程序的执行效率。
- 方案一,3个任务1个main线程处理,大概1130ms
- 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概530毫秒。
Future缺点
get()阻塞
一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)
public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in"); try { TimeUnit.SECONDS.sleep(5);//暂停几秒 } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); //-----------------------------------------------------------注意顺序 System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了"); System.out.println(futureTask.get()); //----------------------------------------------------------注意顺序 } } //main -------主线程忙其他任务了 //t1 ------副线程come in public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in"); try { TimeUnit.SECONDS.sleep(5);//暂停几秒 } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); //-----------------------------------------------------------注意顺序 System.out.println(futureTask.get()); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了"); //----------------------------------------------------------注意顺序 } } //t1 ------副线程come in //-------------------5秒后才出现下面的结果-------------说明一旦调用get()方法直接去找副线程了,阻塞了主线程 //task over //main -------主线程忙其他任务了
isDone() 轮循
利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu
public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask<String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in"); try { TimeUnit.SECONDS.sleep(5);//暂停几秒 } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了"); //1------- System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等3秒,过了3秒直接抛出异常 //2-------更健壮的方式-------轮询方法---等副线程拿到才去get() //但是也会消耗cpu资源 while(true){ if(futureTask.isDone()){ System.out.println(futureTask.get()); break; }else{ //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("正在处理中------------正在处理中"); } } } } //main -------主线程忙其他任务了 //t1 ------副线程come in //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //正在处理中------------正在处理中 //task over
CompletableFuture基本介绍
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Completable Future的方法。
它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future
和Completion Stage
接口
CompletionStage
Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段
核心的四个静态方法(分为两组)
- 关键就是 |有没有返回值|是否用了线程池|
- 参数说明:
- 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
- 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。
runAsync无返回值
runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable)
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); //停顿几秒线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(voidCompletableFuture.get()); } } //ForkJoinPool.commonPool-worker-9 //默认的线程池 //null --- 没有返回值
runAsync+线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); //停顿几秒线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },executorService); System.out.println(voidCompletableFuture.get()); } } //pool-1-thread-1 ----指定的线程池 //null ----没有返回值
supplyAsync有返回值
supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池 CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "helllo supplyasync"; }); System.out.println(objectCompletableFuture.get()); } } //ForkJoinPool.commonPool-worker-9---------默认的线程池 //helllo supplyasync-------------supplyasync有返回值了
supplyAsync+线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池 CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "helllo supplyasync"; },executorService); System.out.println(objectCompletableFuture.get()); } } //ForkJoinPool.commonPool-worker-9---------默认的线程池 //helllo supplyasync-------------supplyasync有返回值了
CompletableFuture使用演示(日常使用)
基本功能
CompletableFuture
可以完成Future
的功能
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"----副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1秒钟后出结果"+result); return result; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); System.out.println(objectCompletableFuture.get()); } } //main线程先去忙其他任务 //ForkJoinPool.commonPool-worker-9----副线程come in //1秒钟后出结果6 //6
减少阻塞和轮询whenComplete
CompletableFuture
通过whenComplete
来减少阻塞和轮询(自动回调)
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; }).whenComplete((v,e) -> {//没有异常,v是值,e是异常 // 自动回调,不需要get if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } }).exceptionally(e->{//有异常的情况 e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } //ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool) //main线程先去忙其他任务 //------------------计算完成,更新系统updataValue3
假如换用自定义线程池
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常 if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } }).exceptionally(e->{//有异常的情况 e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } //pool-1-thread-1--------副线程come in //main线程先去忙其他任务 //------------------计算完成,更新系统updataValue6
异常情况的展示,设置一个异常 int i = 10 / 0 ;
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----结果---异常判断值---"+result); //---------------------异常情况的演示-------------------------------------- if(result > 2){ int i = 10 / 0 ;//我们主动的给一个异常情况 } //------------------------------------------------------------------ return result; },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常 if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } }).exceptionally(e->{//有异常的情况 e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } //pool-1-thread-1--------副线程come in //main线程先去忙其他任务 //-----结果---异常判断值---4 (这里 4 >2了,直接抛出异常) //异常情况java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero // at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) // at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) // at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) // at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) // at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) // at java.lang.Thread.run(Thread.java:748) //Caused by: java.lang.ArithmeticException: / by zero // at com.zhang.admin.controller.CompletableFutureUseDemo.lambda$main$0(CompletableFutureUseDemo.java:19) // at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) // ... 3 more
剑指JUC原理-17.CompletableFuture(下):https://developer.aliyun.com/article/1413662