剑指JUC原理-17.CompletableFuture(上):https://developer.aliyun.com/article/1413660
CompletableFuture优点总结
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法。
CompletableFuture案例精讲
编程必备技能准备
函数式接口
函数式接口的定义:
- 任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。
public interface Runnable{ public abstract void run(); }
常见的函数式接口
- Runnable
@FunctionalInterface public interface Runnable { public abstract void run(); }
- Function
@FunctionalInterface public interface Function<T, R> { R apply(T t); }
- Consumer
@FunctionalInterface public interface Consumer<T> { void accept(T t); }
- Supplier
@FunctionalInterface public interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
- Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
@FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u); }
函数式接口名称 | 方法名称 | 参数 | 返回值 |
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1个参数 | 有返回值 |
Consume | accept | 1个参数 | 无返回值 |
Supplier | get | 没有参数 | 有返回值 |
Biconsumer | accept | 2个参数 | 无返回值 |
链式调用|链式编程|链式写法
public class Chain { public static void main(String[] args) { //-------------------老式写法------------ // Student student = new Student(); // student.setId(1); // student.setMajor("cs"); // student.setName("小卡"); new Student().setId(1).setName("大卡").setMajor("cs"); } } @NoArgsConstructor @AllArgsConstructor @Data @Accessors(chain = true)//开启链式编程 class Student{ private int id; private String name; private String major; }
join和get对比
- 功能几乎一样,区别在于编码时是否需要抛出异常
- get()方法需要抛出异常
- join()方法不需要抛出异常
public class Chain { public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 12345"; }); System.out.println(completableFuture.get()); } } public class Chain { public static void main(String[] args) {//抛出异常 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "hello 12345"; }); System.out.println(completableFuture.join()); } }
实战精讲-比价网站案例
需求
1需求说明 1.1同一款产品,同时搜索出同款产品在各大电商平台的售价; 1.2同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少 2输出返回: 出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String> 《mysql》in jd price is 88.05 《mysql》in dang dang price is 86.11 《mysql》in tao bao price is 90.43 3解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表 1 stepbystep , 按部就班, 查完京东查淘宝, 查完淘宝查天猫...... 2 all in ,万箭齐发,一口气多线程异步任务同时查询。。。
基本框架搭建
- 相当于是一个一个按部就班
public class Case { static List<NetMall> list = Arrays.asList( new NetMall("jd"), new NetMall("dangdang"), new NetMall("taobao") ); public static List<String> getPrice(List<NetMall> list,String productName){ return list .stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符 .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(),//第一个% netMall.calcPrice(productName))).collect(Collectors.toList());//第二个% } public static void main(String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql"); for(String element:list1){ System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("---当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒"); } } class NetMall{ @Getter private String netMallName; public NetMall(String netMallName){ this.netMallName = netMallName; } public double calcPrice(String productName){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格 } } //mysql in jd price is 110.48 //mysql in dangdang price is 109.06 //mysql in taobao price is 110.96 //---当前操作花费时间----costTime:3098毫秒
从功能到性能:利用CompletableFuture
- 这里是利用异步线程,万箭齐发
- 此处用了两步流式编程。
- 性能差距巨大
public class Case { static List<NetMall> list = Arrays.asList( new NetMall("jd"), new NetMall("dangdang"), new NetMall("taobao") ); public static List<String> getPrice(List<NetMall> list,String productName){ return list .stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符 .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(),//第一个% netMall.calcPrice(productName))).collect(Collectors.toList());//第二个% } //从功能到性能 public static List<String> getPricesByCompletableFuture(List<NetMall> list,String productName){ return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))))//Stream<CompletableFuture<String>> .collect(Collectors.toList())//List<CompletablFuture<String>> .stream()//Stream<CompletableFuture<String> .map(s->s.join())//Stream<String> .collect(Collectors.toList()); } public static void main(String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql"); for(String element:list1){ System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("--普通版----当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒"); System.out.println("------------------------------分割----------------------"); startTime = System.currentTimeMillis(); List<String> list2 = getPricesByCompletableFuture(list, "mysql"); for(String element:list2){ System.out.println(element); } endTime = System.currentTimeMillis(); System.out.println("--性能版-当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒"); } } class NetMall{ @Getter private String netMallName; public NetMall(String netMallName){ this.netMallName = netMallName; } public double calcPrice(String productName){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格 } } //mysql in jd price is 109.49 //mysql in dangdang price is 110.85 //mysql in taobao price is 110.32 //--普通版----当前操作花费时间----costTime:3124毫秒 //------------------------------分割---------------------- //mysql in jd price is 109.34 //mysql in dangdang price is 109.02 //mysql in taobao price is 110.37 //--性能版-当前操作花费时间----costTime:1000毫秒
CompletableFuture常用API
获得结果和触发计算
获取结果
- public T get() 不见不散,容易阻塞
- public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
- public T join() 类似于get(),区别在于是否需要抛出异常
- public T getNow(T valueIfAbsent)
没有计算完成的情况下,给一个替代结果
立即获取结果不阻塞
- 计算完,返回计算完成后的结果
- 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
public boolean complete(T value) 是否立即打断get()方法返回括号值
(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2);//执行需要2秒 } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); try { TimeUnit.SECONDS.sleep(1);//等待需要1秒 } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());//执2-等1 返回true+备胎值completeValue } }
对计算结果进行处理
thenApply
计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化, CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).thenApply(f -> { System.out.println("222"); return f + 1; }).thenApply(f -> { //int age = 10/0; // 异常情况:那步出错就停在那步。 System.out.println("333"); return f + 1; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: "+v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } //-----正常情况 //111 //222 //333 //----计算结果: 6 //-----异常情况 //111 //异常.....
handle
类似于thenApply,但是有异常的话仍然可以往下走一步。
public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化, // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理 CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).handle((f,e) -> { int age = 10/0;//异常语句 System.out.println("222"); return f + 1; }).handle((f,e) -> { System.out.println("333"); return f + 1; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: "+v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } //-----异常情况 //111 //333 //异常,可以看到多走了一步333
- 一般用thenApply
对计算结果进行消费
- 接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function
thenAccept
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f + 2; }).thenApply(f -> { return f + 3; }).thenApply(f -> { return f + 4; }).thenAccept(r -> System.out.println(r)); } //6 //消费一下,直接得到6
补充:Code之任务之间的顺序执行
thenRun
- thenRun(Runnable runnable)
- 任务A执行完执行B,并且B不需要A的结果
thenAccept
- thenAccept(Consumer action)
- 任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply
- thenApply(Function fn)
- 任务A执行完执行B,B需要A的结果,同时任务B有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); //null System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join()); //resultA打印出来的 null因为没有返回值 System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join()); //resultAresultB 返回值
CompleteFuture和线程池说明(非常重要)
- 上面的几个方法都有普通版本和后面加Async的版本
- 以
thenRun
和thenRunAsync
为例,有什么区别?
先看结论
- 没有传入自定义线程池,都用默认线程池ForkJoinPool
- 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
//2-1 public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); } } //1号任务 pool-1-thread-1 //2号任务 pool-1-thread-1 //3号任务 pool-1-thread-1 //4号任务 pool-1-thread-1
//2-2 public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRunAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); } } //1号任务 pool-1-thread-1 //2号任务 ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool //3号任务 ForkJoinPool.commonPool-worker-9 //4号任务 ForkJoinPool.commonPool-worker-9
也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRun(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); } } //1号任务 1号任务 pool-1-thread-1 //2号任务 main //3号任务 main //4号任务 main
源码
//CompletableFuture.java 2009行 public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的 return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool }
//进入asyncPool private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的 /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池
对计算速度进行选用
applyToEither
方法,快的那个掌权
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "play1 "; }); CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "play2"; }); CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用 return f + " is winner"; }); System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get()); } } //ForkJoinPool.commonPool-worker-9 ---come in //ForkJoinPool.commonPool-worker-2 ---come in //main play2 is winner
对计算结果进行合并
thenCombine
合并
- 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
- 先完成的先等着,等待其它分支任务
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return 10; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return 20; }); CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return x + y; }); System.out.println(thenCombineResult.get()); } } //30