剑指JUC原理-17.CompletableFuture(上)

本文涉及的产品
可视分析地图(DataV-Atlas),3 个项目,100M 存储空间
数据可视化DataV,5个大屏 1个月
简介: 剑指JUC原理-17.CompletableFuture

Future和Callable接口


  • Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)
  • 一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

  • 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。
  • 有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)


FutureTask实现类


  • 在源码可以看到,他既继承了RunnableFuture接口,也在构造方法中实现了Callable接口(有返回值、可抛出异常)和Runnable接口

  public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

完成上面目的的代码 - 多线程/有返回/异步


一个主线程,一个mythread|步执行了|返回了"hello callable"

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(futureTask.get());//接收返回值
    }
}
class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ----异步执行");
        return "hello Callable 返回值";
    }
}
//结果
//-----come in call() ----异步执行
//hello Callable 返回值


Future到CompletableFuture


Future优点


  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。
  • 方案一,3个任务1个main线程处理,大概1130ms

  • 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概530毫秒


Future缺点


get()阻塞


一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        System.out.println(futureTask.get());
        //----------------------------------------------------------注意顺序
    }
}
//main  -------主线程忙其他任务了
//t1   ------副线程come in
public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        //----------------------------------------------------------注意顺序
    }
}
//t1   ------副线程come in
//-------------------5秒后才出现下面的结果-------------说明一旦调用get()方法直接去找副线程了,阻塞了主线程
//task over
//main  -------主线程忙其他任务了


isDone() 轮循


利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
      //1-------  System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等3秒,过了3秒直接抛出异常
        //2-------更健壮的方式-------轮询方法---等副线程拿到才去get()
        //但是也会消耗cpu资源
        while(true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else{
                //暂停毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("正在处理中------------正在处理中");
            }
        }
    }
}
//main  -------主线程忙其他任务了
//t1   ------副线程come in
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//task over


CompletableFuture基本介绍


阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture


CompletableFuture


public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Completable Future的方法。


它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。


它实现了FutureCompletion Stage接口


CompletionStage


Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段


核心的四个静态方法(分为两组)


  • 关键就是 |有没有返回值|是否用了线程池|
  • 参数说明:
  • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
  • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。


runAsync无返回值


runAsync


public static CompletableFuture<Void> runAsync(Runnable runnable)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(voidCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9 //默认的线程池
//null --- 没有返回值


runAsync+线程池


public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },executorService);
        System.out.println(voidCompletableFuture.get());
    }
}
//pool-1-thread-1   ----指定的线程池
//null ----没有返回值


supplyAsync有返回值


supplyAsync


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        });
        System.out.println(objectCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了


supplyAsync+线程池


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池
        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        },executorService);
        System.out.println(objectCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了


CompletableFuture使用演示(日常使用)


基本功能


CompletableFuture可以完成Future的功能

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果"+result);
            return result;
        });
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        System.out.println(objectCompletableFuture.get());
    }
}
//main线程先去忙其他任务
//ForkJoinPool.commonPool-worker-9----副线程come in
//1秒钟后出结果6
//6


减少阻塞和轮询whenComplete


CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        }).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            // 自动回调,不需要get
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });
        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue3

假如换用自定义线程池

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });
        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//pool-1-thread-1--------副线程come in
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue6

异常情况的展示,设置一个异常 int i = 10 / 0 ;

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----结果---异常判断值---"+result);
            //---------------------异常情况的演示--------------------------------------
            if(result > 2){
                int i  = 10 / 0 ;//我们主动的给一个异常情况
            }
            //------------------------------------------------------------------
            return result;
        },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });
        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//pool-1-thread-1--------副线程come in
//main线程先去忙其他任务
//-----结果---异常判断值---4                (这里 4 >2了,直接抛出异常)
//异常情况java.lang.ArithmeticException: / by zero  java.lang.ArithmeticException: / by zero
//java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
//  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
//  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
//  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
//  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
//  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
//  at java.lang.Thread.run(Thread.java:748)
//Caused by: java.lang.ArithmeticException: / by zero
//  at com.zhang.admin.controller.CompletableFutureUseDemo.lambda$main$0(CompletableFutureUseDemo.java:19)
//  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
//  ... 3 more


剑指JUC原理-17.CompletableFuture(下):https://developer.aliyun.com/article/1413662

相关实践学习
DataV Board用户界面概览
本实验带领用户熟悉DataV Board这款可视化产品的用户界面
阿里云实时数仓实战 - 项目介绍及架构设计
课程简介 1)学习搭建一个数据仓库的过程,理解数据在整个数仓架构的从采集、存储、计算、输出、展示的整个业务流程。 2)整个数仓体系完全搭建在阿里云架构上,理解并学会运用各个服务组件,了解各个组件之间如何配合联动。 3&nbsp;)前置知识要求 &nbsp; 课程大纲 第一章&nbsp;了解数据仓库概念 初步了解数据仓库是干什么的 第二章&nbsp;按照企业开发的标准去搭建一个数据仓库 数据仓库的需求是什么 架构 怎么选型怎么购买服务器 第三章&nbsp;数据生成模块 用户形成数据的一个准备 按照企业的标准,准备了十一张用户行为表 方便使用 第四章&nbsp;采集模块的搭建 购买阿里云服务器 安装 JDK 安装 Flume 第五章&nbsp;用户行为数据仓库 严格按照企业的标准开发 第六章&nbsp;搭建业务数仓理论基础和对表的分类同步 第七章&nbsp;业务数仓的搭建&nbsp; 业务行为数仓效果图&nbsp;&nbsp;
目录
相关文章
|
6月前
|
Java
剑指JUC原理-14.ReentrantLock原理(下)
剑指JUC原理-14.ReentrantLock原理
38 1
|
6月前
|
Java BI API
剑指JUC原理-17.CompletableFuture(下)
剑指JUC原理-17.CompletableFuture
58 0
|
6月前
|
SQL Java 数据库连接
剑指JUC原理-15.ThreadLocal(上)
剑指JUC原理-15.ThreadLocal
78 1
|
6月前
|
存储 算法 安全
剑指JUC原理-5.synchronized底层原理(上)
剑指JUC原理-5.synchronized底层原理
57 0
|
6月前
|
安全 Java 程序员
剑指JUC原理-14.ReentrantLock原理(上)
剑指JUC原理-14.ReentrantLock原理
45 0
|
6月前
|
Java Linux API
剑指JUC原理-2.线程
剑指JUC原理-2.线程
62 0
|
6月前
|
存储 Java 数据安全/隐私保护
剑指JUC原理-15.ThreadLocal(中)
剑指JUC原理-15.ThreadLocal
48 0
|
6月前
|
NoSQL 前端开发 Java
剑指JUC原理-20.并发编程实践(中)
剑指JUC原理-20.并发编程实践
67 0
|
6月前
|
消息中间件 存储 Java
剑指JUC原理-20.并发编程实践(下)
剑指JUC原理-20.并发编程实践
64 0
|
6月前
|
存储 Java 编译器
剑指JUC原理-5.synchronized底层原理(下)
剑指JUC原理-5.synchronized底层原理
52 0