CompletableFuture 异步处理

简介: CompletableFuture 异步处理

不知道大家是否对异步有所了解; 异步初级版

先给大家简单举例介绍下:


我们传统的程序都是单线程的,程序的运行是同步的。(一个执行完一个执行)

比如查询A表要20秒 ,查询B表也要20秒 那么在同步执行的情况下总共需要40秒的时间。

而异步处理是将这2个查询异步的去进行,总共只需要20秒,极大的提高了系统的吞吐量。异步就是从主线程发射一个子线程来完成任务.


大家对异步有一个简单认识之后;


我们为什么要使用异步呢?


当程序向另外⼀台服务器发出请求,由于⽹络等外部原因,此种通信任务往往会耗费⼤量时间,进程如果在此期间仅仅只能等待⽹络或⽹络上其他机器的响应,将严重地降低了性能。程序本就不应该浪费等待的时间,⽽应该更加⾼效地利⽤,在等待的时间执⾏其他任务,回复到达后在继续执⾏ 第⼀个任务。 如果程序调⽤某个⽅法,等待其执⾏全部处理后才能继续执⾏,我们称其为同步。相反,在处理完成之前就返回调⽤⽅法则是异步的。


异步编程相对于单体线程而言,不像单体编程那样等上面的代码执行完才能执行下面的 ,他是可以同时多部分一起进行执行行的,在代码运行过程能大大节省时间提升效率,提升用户体验;


CompletableFuture


CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。

通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

PPT

如需要可私信; 暂放文件上传


supplyAsync有两种签名:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)


第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin线程池来执行被提交的任务。


第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。

简单来说也就是第一种是使用默认线程池的,第二种则可以指定线程池;有返回值的异步任务;


CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名: 没有返回值的任务;

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)


allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。


anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果


Demo:


package com.example.yan.excel_split.demo;
import java.text.SimpleDateFormat;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException,InterruptedException {
        //获取开始时间(用来统计程序运行时常)
        long t1 = System.currentTimeMillis();
        //同步方式
       //  notAsync();
        //异步方式
        async();
        //获取结束时间(用来统计程序运行时常)
        long t2 = System.currentTimeMillis();
        System.out.println("Main 运行时间 :" + (t2-t1)/1000+"秒");
    }
    public static void notAsync(){
        System.out.println("---------------------------notAsync方法开始运行-----------------------------");
        //获取开始时间(用来统计程序运行时常)
        long t1 = System.currentTimeMillis();
        System.out.println("notAsync方法开始运行。。。。。");
        System.out.println("开始时间 :" + dateFormat(t1));
        int callSum1 = call1(1, 2);
        int callSum2 = call2(3,4);
        run1(3,4);
        run2(3,4);
        //获取结束时间(用来统计程序运行时常)
        long t2 = System.currentTimeMillis();
        System.out.println("结束时间 :" + dateFormat(t2));
        System.out.println("---------------------------notAsync方法运行结束-----------------------------");
        System.out.println("运行时间 :" + (t2-t1)/1000+"秒");
    }
    public static void async() throws ExecutionException ,InterruptedException{
        System.out.println("---------------------------async方法开始运行-----------------------------");
        //异步方法 有返回值一 (均采用默认线程池)
        CompletableFuture<Integer> callFuture1 = CompletableFuture.supplyAsync(()->call1(1, 2));
        //异步方法 有返回值二
        CompletableFuture<Integer> callFuture2 = CompletableFuture.supplyAsync(()->call2(3,4));
        //异步方法 无返回值一
        CompletableFuture runFuture1 = CompletableFuture.runAsync(()->run1(3,4));
        //异步方法 无返回值二
        CompletableFuture runFuture2 = CompletableFuture.runAsync(()->run2(3,4));
        //allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
        //anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
        CompletableFuture.allOf(callFuture1, callFuture2,runFuture1, runFuture2);
        //获取返回结果
        callFuture1.get();
        //获取返回结果
        callFuture2.get();
        //获取返回结果
        runFuture1.get();
        //获取返回结果
        runFuture2.get();
        System.out.println("---------------------------async方法结束运行-----------------------------");
    }
    private static Integer call1(Integer num1, Integer num2) {
        long t1 = System.currentTimeMillis();
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int sum = num1 + num2;
        long t2 = System.currentTimeMillis();
        System.out.println("有返回值--》supplyAsync--》call1  睡眠时间:" + (t2-t1)/1000+"秒  运行结果:"+ sum);
        return sum;
    }
    private static Integer call2(int num1, int num2) {
        long t1 = System.currentTimeMillis();
        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int sum = num1 + num2;
        long t2 = System.currentTimeMillis();
        System.out.println("有返回值--》supplyAsync--》call2  睡眠时间:" + (t2-t1)/1000+"秒  运行结果:"+ sum);
        return sum;
    }
    private static void run1(int num1, int num2) {
        long t1 = System.currentTimeMillis();
        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int sum = num1 + num2;
        long t2 = System.currentTimeMillis();
        System.out.println("无返回值--》runAsync--》run1  睡眠时间:" + (t2-t1)/1000+"秒  运行结果:"+ sum);
    }
    private static void run2(int num1, int num2) {
        long t1 = System.currentTimeMillis();
        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int sum = num1 + num2;
        long t2 = System.currentTimeMillis();
        System.out.println("无返回值--》runAsync--》run2  睡眠时间:" +(t2-t1)/1000+"秒  运行结果:"+ sum);
    }
    public static String dateFormat(long l){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String format = sdf.format(l);
        return format;
    }
}


同步执行结果异步执行结果:

---------------------------notAsync方法开始运行-----------------------------
notAsync方法开始运行。。。。。
开始时间 :2022-07-16 20:46:19
有返回值--》supplyAsync--》call1  睡眠时间:10秒  运行结果:3
有返回值--》supplyAsync--》call2  睡眠时间:20秒  运行结果:7
无返回值--》runAsync--》run1  睡眠时间:20秒  运行结果:7
无返回值--》runAsync--》run2  睡眠时间:20秒  运行结果:7
结束时间 :2022-07-16 20:47:29
---------------------------notAsync方法运行结束-----------------------------
运行时间 :70秒
Main 运行时间 :70秒


异执行结果异步执行结果:

---------------------------async方法开始运行-----------------------------
有返回值--》supplyAsync--》call1  睡眠时间:10秒  运行结果:3
有返回值--》supplyAsync--》call2  睡眠时间:20秒  运行结果:7
无返回值--》runAsync--》run1  睡眠时间:20秒  运行结果:7
无返回值--》runAsync--》run2  睡眠时间:20秒  运行结果:7
---------------------------async方法结束运行-----------------------------
Main 运行时间 :20秒


结构图:



扩展:


1.同步与异步

同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)

所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。

换句话说,就是由调用者主动等待这个调用的结果。

而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

典型的异步编程模型比如Node.js


举个通俗的例子:

你打电话问书店老板有没有《黑道是怎么炼成》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。

而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。


阻塞:进程给CPU传达一个任务之后,一直等待CPU处理完成,然后才执行后面的操作。

非阻塞:进程给CPU传达任我后,继续处理后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。


阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。


非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

还是上面的例子,


你打电话问书店老板有没有《怎么榜上富婆》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,

如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩, 当然你也要偶尔过几分钟check 问一下老板有没有返回结果。

相关文章
|
7月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
76 2
CompletableFuture事务处理
CompletableFuture事务处理
260 0
|
监控 Java API
并发编程 - CompletableFuture
并发编程 - CompletableFuture
82 0
|
3月前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
6月前
|
IDE Java API
玩转 CompletableFuture 异步编程
玩转 CompletableFuture 异步编程
37 0
|
7月前
|
Java
Future:异步任务结果获取
Future:异步任务结果获取
74 0
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
Java 调度
并发编程——Future & CompletableFuture
Java创建线程的方式,一般常用的是Thread,Runnable。如果需要当前处理的任务有返回结果的话,需要使用Callable。Callable运行需要配合Future。
63 0
|
开发工具
JUC(一)异步调用CompletableFuture
JUC(一)异步调用CompletableFuture
JUC(一)异步调用CompletableFuture