JUC系列(十一) | Java 8 CompletableFuture 异步编程

简介: JUC系列(十一) | Java 8 CompletableFuture 异步编程

微信截图_20220525190838.png


多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!!

沉下去,再浮上来,我想我们会变的不一样的。


CompletableFuture


一、什么是CompletableFuture?


在Java中CompletableFuture用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息


在这种方式中,主线程不会被阻塞,因为子线程是另外一条线程在执行,所以不需要一直等到子线程完成。主线程就可以并行的执行其他任务。这种并行方式,可以极大的提供程序性能。


CompletableFuture 实现了 Future, CompletionStage 接口。


QQ截图20220525191012.png


  1. 实现了 Future接口CompletableFuture就可以兼容现在有线程池框架;


  1. CompletionStage 接口是异步编程的接口抽象,里面定义多种异步方法,实现了CompletionStage多种抽象方法和Future并与一起使用,从而才打造出了强大的CompletableFuture 类。


二、Future 与 CompletableFuture


CompletableFuture Future API的扩展。


Future接口源码上说明:


Future表示异步计算的结果。 提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。 结果只能在计算完成后使用get方法检索,必要时阻塞,直到它准备好。 取消由cancel方法执行。 提供了其他方法来确定任务是正常完成还是被取消。 一旦计算完成,就不能取消计算。 --来自谷歌翻译


Future 的主要缺点如下


(1)不能够手动的主动给完成任务(即不能手动的主动结束任务)


(2)Future 的结果在非阻塞的情况下,不能执行更进一步的操作


  • Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。就是它完成了,你不会被通知,只能主动去询问它。


(3)不能够支持链式调用


  • 就是不能将上一个Future的计算结果传递给下一个Future使用,即不能构成像Web中的Filter模式一样.


(4)不支持多个 Future 合并


  • 就是不能将多个Future合并起来。


(5)不支持异常处理


  • Future 的 API 没有任何的异常处理的 api,所以运行时,很有可能无法定位到错误。


  • Future API:


public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning); //尝试取消此任务的执行。
    boolean isCancelled();//如果此任务在正常完成之前被取消,则返回true 
    boolean isDone(); //如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true 
    V get() throws InterruptedException, ExecutionException; //获得任务计算结果
    V get(long timeout, TimeUnit unit) 
        throws InterruptedException, ExecutionException, TimeoutException;//可等待多少时间去获得任务计算结果
}


三、应用


3.1、创建CompletableFuture对象


CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:


//runAsync 返回void 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
//supplyAsync 异步返回一个结果 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
//Supplier  是一个函数式接口,代表是一个生成者的意思
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)


3.2、场景一:主动完成任务


场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo1 {
    /**
     * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "子线程开始干活");
                //子线程睡 5 秒
                Thread.sleep(5000);
//                //在子线程中完成主线程 如果注释掉这一行代码将会一直停住
                future.complete("success");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "A").start();
        //主线程调用 get 方法阻塞
        System.out.println("主线程调用 get 方法获取结果为: " + future.get());
        System.out.println("主线程完成,阻塞结束!!!!!!");
    }
}


3.3、场景二:没有返回值的异步任务


runAsync:返回一个新的 CompletableFuture,它在运行给定操作后由在ForkJoinPool.commonPool()运行的任务异步完成。


如果你想异步的运行一个后台任务并且不需要任务返回结果,就可以使用runAsync


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo2 {
    /**
     * 没有返回值的异步任务
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        //主线程阻塞
        future.get();
        System.out.println("主线程结束");
    }
}


3.4、场景三:有返回值的异步任务


supplyAsync:返回任务结果。


CompletableFuture.supplyAsync()它持有supplier 并且返回CompletableFutureT 是通过调用 传入的supplier取得的值的类型。


Supplier 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法可以写入你的后台任务中,并且返回结果。


public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo2 {
    /**
     * 有返回值的异步任务
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "子线程任务完成";
        });
        //主线程阻塞
        System.out.println(future.get());
        System.out.println("主线程结束");
    }
}
/**
 * 主线程开始
 * 子线程启动干活
 * 子线程任务完成
 * 主线程结束
 */


3.5、场景四:线程串行化


当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo4 {
    private static String action="";
    /**
     * 线程依赖
     * 1、我到了烧烤店,
     * 2、开始点烧烤
     * 3、和朋友次完烧烤 ,给女朋友带奶茶回去
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture<String> future =
                CompletableFuture.supplyAsync(() -> {
                    action="和朋友一起去次烧烤!!!! ";
                    return action;
                }).thenApply(string -> {
                    return  action+="到店里——>开始点烧烤!!";
                }).thenApply(String->{
                    return  action+="和朋友们次完烧烤,给女朋友带杯奶茶回去!!";
                });
        String str = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + str);
    }
}
/**
 主线程开始
 主线程结束, 子线程的结果为:和朋友一起去次烧烤!!!到店里——>开始点烧烤!!和朋友们次完烧烤,给女朋友带杯奶茶回去!!
 */


3.6、场景五:thenAccept 消费处理结果


如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept() thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。


thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo5 {
    private static String action = "";
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                action = "逛淘宝,想买双鞋 ";
            } catch (Exception e) {
                e.printStackTrace();
            }
            return action;
        }).thenApply(string -> {
            return action + "选中了,下单成功!!";
        }).thenApply(String -> {
            return action + "等待快递到来";
        }).thenAccept(new Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + s);
            }
        });
    }
}
/**
 主线程开始
 子线程全部处理完成,最后调用了 accept,结果为:逛淘宝,想买双鞋 等待快递到来
 */


3.7、场景六:异常处理


exceptionally 异常处理,出现异常时触发,可以回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo6 {
    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i= 1/0;
            System.out.println("子线程执行中");
            return i;
        }).exceptionally(ex -> {
            System.out.println(ex.getMessage());
            return -1;
        });
        System.out.println(future.get());
    }
}
/**
 * 主线程开始
 * java.lang.ArithmeticException: / by zero
 * -1
 */


使用 handle() 方法处理异常API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用


CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务开始");
    int i=0/1;
    return i;
}).handle((i,ex) -> {
    System.out.println("进入 handle 方法");
    if (ex != null) {
        System.out.println("发生了异常,内容为:" + ex.getMessage());
        return -1;
    } else {
        System.out.println("正常完成,内容为: " + i);
        return i;
    }
});


3.8、场景七: 结果合并


thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo7 {
    private static Integer num = 10;
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //第一步加 10
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("让num+10;任务开始");
            num += 10;
            return num;
        });
        //合并
        CompletableFuture<Integer> future1 = future.thenCompose(i ->
                //再来一个 CompletableFuture
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        System.out.println(future.get());
        System.out.println(future1.get());
    }
}
/**
 * 主线程开始
 * 让num+10;任务开始
 * 20
 * 21
 */


thenCombine 合并两个没有依赖关系的 CompletableFutures 任务


package com.crush.juc09;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo8 {
    private static Integer sum = 0;
    private static Integer count = 1;
    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始");
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("从1+...+50开始");
            for (int i=1;i<=50;i++){
                sum+=i;
            }
            System.out.println("sum::"+sum);
            return sum;
        });
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("从1*...*10开始");
            for (int i=1;i<=10;i++){
                count=count*i;
            }
            System.out.println("count::"+count);
            return count;
        });
        //合并两个结果
        CompletableFuture<Object> future = job1.thenCombine(job2, new
                BiFunction<Integer, Integer, List<Integer>>() {
                    @Override
                    public List<Integer> apply(Integer a, Integer b) {
                        List<Integer> list = new ArrayList<>();
                        list.add(a);
                        list.add(b);
                        return list;
                    }
                });
        System.out.println("合并结果为:" + future.get());
    }
}
/**
 主线程开始
 从1*...*10开始
 从1+...+50开始
 sum::1275
 count::3628800
 合并结果为:[1275, 3628800]
 */


3.9、场景八:合并多个任务的结果


allOf 与 anyOf

allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情


/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo9 {
    private static Integer num = 10;
    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始");
        List<CompletableFuture> list = new ArrayList<>();
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        list.add(job1);
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job2);
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("减以 10 任务开始");
            num = num - 10;
            return num;
        });
        list.add(job3);
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("除以 10 任务开始");
            num = num / 10;
            return num;
        });
        list.add(job4);
        //多任务合并
        List<Integer> collect =
                list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
        System.out.println(collect);
    }
}
/**主线程开始
 乘以 10 任务开始
 加 10 任务开始
 减以 10 任务开始
 除以 10 任务开始
 [110, 100, 100, 10]
*/


anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束


package com.crush.juc09;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
 * @Author: crush
 * @Date: 2021-08-23 9:08
 * version 1.0
 */
public class CompletableFutureDemo10 {
    private static Integer num = 10;
    /**
     * 先对一个数加 10,然后取平方
     * @param args
     */
    public static void main(String[] args) throws Exception{
        System.out.println("主线程开始");
        CompletableFuture<Integer>[] futures = new CompletableFuture[4];
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(5000);
                System.out.println("加 10 任务开始");
                num += 10;
                return num;
            }catch (Exception e){
                return 0;
            }
        });
        futures[0] = job1;
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(2000);
                System.out.println("乘以 10 任务开始");
                num = num * 10;
                return num;
            }catch (Exception e){
                return 1;
            }
        });
        futures[1] = job2;
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(3000);
                System.out.println("减以 10 任务开始");
                num = num - 10;
                return num;
            }catch (Exception e){
                return 2;
            }
        });
        futures[2] = job3;
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
            try{
                Thread.sleep(4000);
                System.out.println("除以 10 任务开始");
                num = num / 10;
                return num;
            }catch (Exception e){
                return 3;
            }
        });
        futures[3] = job4;
        CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
        System.out.println(future.get());
    }
}
//主线程开始
//乘以 10 任务开始
//100


四、小结


本文只是做了一点简单介绍,还需要大家更深入的了解。


🌈自言自语


最近又开始了JUC的学习,感觉Java内容真的很多,但是为了能够走的更远,还是觉得应该需要打牢一下基础。


最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们

一起学习,一起讨论吧。


你好,我是博主宁在春,Java学习路上的一颗小小的种子,也希望有一天能扎根长成苍天大树。


希望与君共勉😁


我们:待别时相见时,都已有所成


目录
相关文章
|
28天前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
2月前
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。
|
21天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
4月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【7月更文挑战第1天】Java 8的CompletableFuture革新了异步编程,提供链式处理和优雅的错误处理。反应式编程,如Project Reactor,强调数据流和变化传播,擅长处理大规模并发和延迟敏感任务。两者结合,如通过Mono转换CompletableFuture,兼顾灵活性与资源管理,提升现代Java应用的并发性能和响应性。开发者可按需选择和融合这两种技术,以适应高并发环境。
50 1
|
5月前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【6月更文挑战第30天】Java 8的CompletableFuture革新了异步编程,提供如thenApply等流畅接口,而Java 9后的反应式编程(如Reactor)强调数据流和变化传播,以事件驱动应对高并发。两者并非竞争关系,而是互补,通过Flow API和第三方库结合,如将CompletableFuture转换为Mono进行反应式处理,实现更高效、响应式的系统设计。开发者可根据需求灵活选用,提升现代Java应用的并发性能。
67 1
|
2月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
2月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
127 6
【Java学习】多线程&JUC万字超详解
|
5月前
|
存储 缓存 Java
深入剖析Java并发库(JUC)之StampedLock的应用与原理
深入剖析Java并发库(JUC)之StampedLock的应用与原理
深入剖析Java并发库(JUC)之StampedLock的应用与原理
|
4月前
|
并行计算 算法 Java
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
Java面试题:解释Java中的无锁编程的概念,Java中的Fork/Join框架的作用和使用场景,Java中的CompletableFuture的作用和使用场景
33 0
|
4月前
|
安全 Java 数据库连接
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
Java面试题:解释Java内存模型的无锁编程支持,并讨论其优势和局限性,解释Java中的CompletableFuture的工作原理,并讨论其在异步编程中的应用
29 0