Java异步编程CompletableFuture

简介: Java异步编程CompletableFuture

Java异步编程CompletableFuture

1.  简介

CompletableFuturejdk1.8引入的具有任务编排能力的类,它弥补了Future的不足,能让线程按照一定顺序和规则执行,提高程序执行效率.

2.  Future的局限性

之前我们处理异步任务可以使用future,虽然future也能实现异步任务,但是存在一定局限性,下面我们通过一个案例来演示

使用的工具类

/**

 * 工具类

 */

public class CommonUtil {

 

    // 模拟任务耗时

    public static void dowork(long ms){

        try {

            TimeUnit.MILLISECONDS.sleep(ms);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

 

    // 打印线程信息

    public static void printLog(String msg){

        Thread thread = Thread.currentThread();

        System.out.println(System.currentTimeMillis()+"=="+thread.getId()+"=="+thread.getName()+"=="+msg);

    }

}

演示demo

package cn.demo;

 

import cn.demo.util.CommonUtil;

 

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

/**

 * 需求:使用线程池执行3个任务,其中任务3需要获取任务1,任务2的执行结果后再执行

 */

public class FutureTest {

 

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 创建定长线程池

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 执行任务1

        Future<String> f1 = executorService.submit(() -> {

            CommonUtil.printLog("任务1开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务1结束");

            return "a";

        });

        // 执行任务2

        Future<String> f2 = executorService.submit(() -> {

            CommonUtil.printLog("任务2开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务2结束");

            return "b";

        });

 

        // 执行任务3,任务3需要获取任务1,2的执行结果再处理

        Future<String> f3 = executorService.submit(() -> {

            // 获取任务1,2执行结果

            String f1Result = f1.get();

            String f2Result = f2.get();

            CommonUtil.printLog("任务3开始");

            String result = f1Result + f2Result;

            CommonUtil.printLog("任务3结束");

            return result;

        });

        // 获取结果

        CommonUtil.printLog(f3.get());

        executorService.shutdown();

    }

}

通过上述案例,我们发现future存在以下问题

1.    无法对future结果进行进一步处理,需要手动get,get会阻塞线程

2.    处理有依赖关系的future比较麻烦

3.    无法对多个future合并结果

4.    异常需自己处理

3.  使用CompletableFuture

3.1.创建异步任务的方式

创建异步任务有2种方式,一种是使用runAsync,一种是supplyAsync,区别在于runAsync没有返回值supplyAsync.

/**

     *创建异步任务方式1,无返回值

     * @throws Exception

     */

    @Test

    public void testRunAsync() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture.runAsync(()-> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.dowork(4000);

        CommonUtil.printLog("main end");

    }

 

    /**

     *创建异步任务方式2,有返回值

     * @throws Exception

     */

    @Test

    public void testSupplyAsync() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog(supplyAsync.get());

        CommonUtil.printLog("main end");

    }

 

3.2.异步任务中的线程池

异步任务中的线程池,默认用的是ForkJoinPool.commonPool

异步任务共享ForkJoinPool.commonPool线程池,主线程结束时,会自动关闭共享线程池

如果使用共享线程池遇到其中有些任务耗时比较长,可能会影响性能,因此建议使用自定义线程池

/**

     * 异步任务中的线程池,默认用的是ForkJoinPool.commonPool

     * 异步任务共享ForkJoinPool.commonPool线程池,主线程结束时,会关闭共享线程池

     * 如果使用共享线程池遇到其中有些任务耗时比较长,可能会影响性能,建议使用自定义线程池

     * @throws Exception

     */

    @Test

    public void testSupplyByExcutor() throws Exception{

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CommonUtil.printLog("main start");

        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        },executorService);

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog(supplyAsync.get());

        CommonUtil.printLog("main end");

    }

3.3.异步任务回调

异步任务回调有3种方式

thenApply  返回回调结果并处理

thenAccept  不想返回任何回调结果,一般在调用链的最后一步

thenRun  异步任务完成后,只想得到通知,不使用上一步的结果

 /**

     * 异步任务回调之thenApply,返回回调结果并处理

     * @throws Exception

     */

    @Test

    public void testThenApply() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        }).thenApply(result -> {

            String[] split = result.split("");

            return split;

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog(Arrays.toString(cf.get()));

        CommonUtil.printLog("main end");

    }

 

    /**

     * 异步任务回调之thenAccept,不想返回任何回调结果,一般在调用链的最后一步

     * @throws Exception

     */

    @Test

    public void testThenAccept() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        }).thenAccept(result->{

            CommonUtil.printLog("result:"+result);

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.dowork(4000);

        CommonUtil.printLog("main end");

    }

 

    /**

     * 异步任务回调之thenRun,异步任务完成后,只想得到通知,不使用上一步的结果

     * @throws Exception

     */

    @Test

    public void testThenRun() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        }).thenRun(()->{

            CommonUtil.printLog("任务执行完成");

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.dowork(4000);

        CommonUtil.printLog("main end");

    }

3.4.异步任务回调优化

异步任务回调优化,带有xxxAsync,例如thenApplyAsync,thenAcceptAsync等这表示在单独的线程中执行回调任务,这样可以提高效率

备注:如果supplyAsync是立即返回结果,非耗时操作,不会开启线程池来执行,直接用主线程执行

/**

     * 异步任务回调优化,带有xxxAsync,例如thenApplyAsync,thenAcceptAsync

     * 这表示在单独的线程中执行回调任务

     * 备注:如果supplyAsync是立即返回结果,非耗时操作,不会开启线程池来执行,直接用主线程执行

     * @throws Exception

     */

    @Test

    public void testThenApplyAsync() throws Exception{

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        CommonUtil.printLog("main start");

        CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        }).thenApplyAsync(result -> {

            CommonUtil.printLog("result:"+result);

            String[] split = result.split("");

            return split;

        },executorService);

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog(Arrays.toString(cf.get()));

        CommonUtil.printLog("main end");

    }

3.5.异步任务编排

编排2个异步任务的方式

thenCompose:连接2个有依赖关系的任务,结果由第2个任务处理

thenCombine:连接2个没有依赖关系的任务,合并2个任务的结果

 /**

     * 异步任务编排2个之thenCompose,目的是连接2个有依赖关系的任务,结果由第2个任务处理

     * @throws Exception

     */

    @Test

    public void testThenCompose() throws Exception{

        CommonUtil.printLog("main start");

        CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务结束");

            return "123";

        }).thenCompose(result -> CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("result:" + result);

            String[] split = result.split("");

            return split;

        }));

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog(Arrays.toString(cf.get()));

        CommonUtil.printLog("main end");

    }

 

    /**

     * 异步任务编排2个之thenCombine,连接2个没有依赖关系的任务,合并2个任务的结果

     * @throws Exception

     */

    @Test

    public void testThenCombine() throws Exception{

        CommonUtil.printLog("main start");

        // 任务1

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务1开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务1结束");

            return "a";

        });

        // 任务2

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务2开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务2结束");

            return "b";

        });

        // 合并2个任务的结果并处理

        CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (result1, result2) -> {

            CommonUtil.printLog("result1:" + result1);

            CommonUtil.printLog("result2:" + result2);

            return result1 + result2;

        });

        CommonUtil.printLog("=======main continue========");

        // 获取最终结果

        CommonUtil.printLog(cf3.get());

        CommonUtil.printLog("main end");

 

    }

编排多个异步任务的方式

Allof:合并多个异步任务,所有任务完成可以做进一步操作

Anyof:合并多个异步任务,任一执行任务执行完就返回结果

  /**

     * 异步任务编排多个之allof,合并多个异步任务,所有任务完成可以做进一步操作

     * @throws Exception

     */

    @Test

    public void testAllof() throws Exception{

        CommonUtil.printLog("main start");

        // 任务1

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务1开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务1结束");

            return "a";

        });

        // 任务2

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务2开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务2结束");

            return "b";

        });

        // 任务3

        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务3开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务3结束");

            return "c";

        });

        // 合并多个任务的结果

        CompletableFuture<String> cf = CompletableFuture.allOf(cf1, cf2, cf3).thenApply(v -> {

            // 这里使用join处理结果,getjoin的区别在于,get需要自己处理异常,join的是运行时异常不需要自己处理,更适合流式编程

            String result = cf1.join() + cf2.join() + cf3.join();

            CommonUtil.printLog("result:" + result);

            return result;

        });

        CommonUtil.printLog("=======main continue========");

        // 获取最终结果

        CommonUtil.printLog(cf.get());

        CommonUtil.printLog("main end");

 

 

    }

 

    /**

     * 异步任务编排多个之anyof,合并多个异步任务,任一执行任务执行完就返回结果

     * @throws Exception

     */

    @Test

    public void testAnyof() throws Exception{

        CommonUtil.printLog("main start");

        // 任务1

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务1开始");

            CommonUtil.dowork(2000);

            CommonUtil.printLog("任务1结束");

            return "任务1";

        });

        // 任务2

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务2开始");

            CommonUtil.dowork(1000);

            CommonUtil.printLog("任务2结束");

            return "任务2";

        });

        // 任务3

        CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {

            CommonUtil.printLog("任务3开始");

            CommonUtil.dowork(3000);

            CommonUtil.printLog("任务3结束");

            return "任务3";

        });

        CompletableFuture<Object> cf = CompletableFuture.anyOf(cf1, cf2, cf3).thenApply(result -> {

            CommonUtil.printLog("result:" + result);

            return result;

        });

        CommonUtil.printLog("=======main continue========");

        CommonUtil.printLog("获取结果:"+cf.join());

        CommonUtil.printLog("main end");

 

 

    }

3.6.异步任务异常处理

异常处理有2种方式

Exceptionally:当回调链一旦出现异常,都不会再向下执行,直接转入异常处理

Handle:无论是否发生异常都会执行,用于恢复调用链中的一次异常,恢复后继续传递

/**

     * 异步任务异常处理之exceptionally,当回调链一旦出现异常,都不会再向下执行,直接转入异常处理

     * @throws Exception

     */

    @Test

    public void testExceptionally() throws Exception{

        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {

            // int i = 1 / 0;

            return "result1";

        }).thenApply(res -> {

            // 模拟空指针异常

            String str = null;

            int i = str.length();

            return res + " result2";

        }).thenApply(res -> {

            return res + " result3";

        }).exceptionally(ex -> {

            CommonUtil.printLog("出现异常:" + ex.getMessage());

            return "unkonwn";

        });

        CommonUtil.printLog("获取结果:"+cf.join());

 

    }

 

    /**

     * 异步任务异常处理之handle,无论是否发生异常都会执行,用于恢复调用链中的一次异常,恢复后继续传递

     * @throws Exception

     */

    @Test

    public void testHandle() throws Exception{

        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {

            int i = 1 / 0;

            return "result1";

        }).handle((res,ex)->{

            // 使用handle处理异常,根据内部if else分支处理逻辑

            // 如果没有异常,ex为空,传递正常结果,反之

            if(!Objects.isNull(ex)){

                CommonUtil.printLog("出现异常:" + ex.getMessage());

                return "unknow1";

            }

            return res;

        }).thenApply(res -> {

            // 模拟空指针异常

            String str = null;

            int i = str.length();

            return res + " result2";

        }).handle((res,ex)->{

            // 使用handle处理异常,根据内部if else分支处理逻辑

            // 如果没有异常,ex为空,传递正常结果,反之

            if(!Objects.isNull(ex)){

                CommonUtil.printLog("出现异常:" + ex.getMessage());

                return "unknow2";

            }

            return res;

        }).thenApply(res -> {

            return res + " result3";

        });

        CommonUtil.printLog("获取结果:"+cf.join());

    }


 

 

 

相关文章
|
5天前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
13天前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【4月更文挑战第17天】本文探讨了Java中的CompletableFuture和反应式编程在提升异步编程体验上的作用。CompletableFuture作为Java 8引入的Future扩展,提供了一套流畅的链式API,简化异步操作,如示例所示的非阻塞数据库查询。反应式编程则关注数据流和变化传播,通过Reactor等框架实现高度响应的异步处理。两者结合,如将CompletableFuture转换为Mono或Flux,可以兼顾灵活性和资源管理,适应现代高并发环境的需求。开发者可按需选择和整合这两种技术,优化系统性能和响应能力。
|
21天前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
|
3月前
|
前端开发 Java API
Java并发基础:CompletableFuture全面解析
CompletableFuture类使得并发任务的处理变得简单而高效,通过简洁的API,开发者能轻松创建、组合和链式调用异步操作,无需关心底层线程管理,这不仅提升了程序的响应速度,还优化了资源利用率,让复杂的并发逻辑变得易于掌控。
Java并发基础:CompletableFuture全面解析
|
3月前
|
Java
深入理解 Java 异步编程:Future 和 CompletableFuture 的全面比较
深入理解 Java 异步编程:Future 和 CompletableFuture 的全面比较
40 0
|
4月前
|
前端开发 JavaScript Java
每日一博 - Java 异步编程的 Promise 模式 CompletableFuture的前世今生 (上)
每日一博 - Java 异步编程的 Promise 模式 CompletableFuture的前世今生 (上)
68 0
每日一博 - Java 异步编程的 Promise 模式 CompletableFuture的前世今生 (上)
|
6月前
|
Java 测试技术
Java8 异步非阻塞做法:CompletableFuture 两万字详解
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利
Java 8 的异步利器:CompletableFuture源码解析(建议精读)
实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。
|
10月前
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
172 1
Java新特性:异步编排CompletableFuture
|
Java API 网络架构
20个使用 Java CompletableFuture的示例(下)
20个使用 Java CompletableFuture的示例(下)
233 1