异步神器CompletableFuture

简介: 异步神器CompletableFuture

背景


CompletableFuture,提供了很多函数式接口,当接口达到瓶颈时,很有可能需要使用多线程优化项目


想方便的异步执行任务,就必须放到单独的线程中。继承Thread类,实现Runnable都不能拿到任务的执行结果,这时就不得不提创建线程的另一种方式了,实现Callable接口。


对于简单的场景使用Future并没有什么不方便。但是一些复杂的场景就很麻烦,

如2个异步任务,其中一个有结果就直接返回。Future用起来就不方便,因为想获取结果时,要么执行future.get()方法,但是这样会阻塞线程,变成同步操作,要么轮询isDone()方法,但是比较耗费CPU资源。


Netty和Google guava为了解决这个问题,在Future的基础上引入了观察者模式(即在Future上addListener),当计算结果完成时通知监听者。


使用方式

//异步线程
线程1 -> do somthing -> then do somthing
//异步线程
线程2 -> do somthing -> then do somthing


提供了比较全面的方法,而且有异常处理的方法,使用起来较为方便


Java8新增的CompletableFuture则借鉴了Netty等对Future的改造,简化了异步编程的复杂性,并且提供了函数式编程的能力


创建CompletableFuture对象


方法名 描述
completedFuture(U value) 返回一个已经计算好的CompletableFuture
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
runAsync(Runnable runnable, Executor executor) 使用指定的线程池执行任务,没有返回值
supplyAsync(Supplier supplier) 使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
supplyAsync(Supplier supplier, Executor executor) 使用指定的线程池执行任务,有返回值
@FunctionalInterface
public interface Supplier<T> {
    T get();
}

Supplier是一个能获取返回值的函数式接口

CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println(stringFuture.get());

计算结果完成时


方法名
whenComplete(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

因为入参是BiConsumer<? super T,? super Throwable>函数式接口,所以可以处理正常和异常的计算结果


whenComplete和whenCompleteAsync的区别如下


whenComplete:执行完当前任务的线程继续执行whenComplete的任务

whenCompleteAsync:把whenCompleteAsync这个任务提交给线程池来执行


CompletableFuture的所有方法的定义和whenComplete都很类似

方法不以Async结尾意味着使用相同的线程执行

方法以Async结尾意味着将任务提交到线程池来执行

方法以Async结尾时可以用ForkJoinPool.commonPool()作为线程池,也可以使用自己的线程池

后续介绍的所有方法都只写一种case

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).whenComplete((v, e) -> {
    // hello
    System.out.println(v);
});
// hello
System.out.println(future.get());

转换,消费,执行


方法名 描述
thenApply 获取上一个任务的返回,并返回当前任务的值
thenAccept 获取上一个任务的返回,单纯消费,没有返回值
thenRun 上一个任务执行完成后,开始执行thenRun中的任务
CompletableFuture.supplyAsync(() -> {
    return "hello ";
}).thenAccept(str -> {
    // hello world
    System.out.println(str + "world");
}).thenRun(() -> {
    // task finish
    System.out.println("task finish");
});

组合(两个任务都完成)


方法名 描述
thenCombine 组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth 组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
runAfterBoth 组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    return "today ";
}).thenApply(t -> {
    return t + "is ";
}).thenCombine(CompletableFuture.completedFuture("tuesday"), (t, u) -> {
    return t + u;
}).whenComplete((t, e) -> {
    // today is tuesday
    System.out.println(t);
});

组合(只需要一个任务完成)


方法名 描述
applyToEither 两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值
acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值
runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "today is";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "tuesday";
});
CompletableFuture future = future1.applyToEither(future2, str -> str);
// today is tuesday 随机输出
System.out.println(future.get());

sleepRandom()为我写的一个随机暂停的函数

多任务组合

方法名 描述
allOf 当所有的CompletableFuture完成后执行计算
anyOf 任意一个CompletableFuture完成后执行计算

allOf的使用

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "today";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "is";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "tuesday";
});
// today is tuesday
CompletableFuture.allOf(future1, future2, future3)
        .thenApply(v ->
                Stream.of(future1, future2, future3)
                        .map(CompletableFuture::join)
                        .collect(Collectors.joining(" ")))
        .thenAccept(System.out::print);

anyOf的使用

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "today";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "is";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleepRandom();
    return "tuesday";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1, future2, future3);
// today is tuesday 随机输出
System.out.println(resultFuture.get());

异常处理

方法名 描述
exceptionally 捕获异常,进行处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    return 100 / 0;
}).thenApply(num -> {
    return num + 10;
}).exceptionally(throwable -> {
    return 0;
});
// 0
System.out.println(future.get());

当然有一些接口能捕获异常

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    String str = null;
    return str.length();
}).whenComplete((v, e) -> {
    if (e == null) {
        System.out.println("正常结果为" + v);
    } else {
        // 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
        System.out.println("发生异常了" + e.toString());
    }
});

我们也可以用异常标识,在主线程抛出异常

//异步执行结果标识
AtomicBoolean flag = new AtomicBoolean(true);
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
  //do somthing
  return xxx;
}).exceptionally(throwable -> {
  flag.set(false);
  return null;
});
CompletableFuture completableFutureV2 = CompletableFuture.supplyAsync(() -> {
  //do somthing
  return xxx;
}).exceptionally(throwable -> {
  flag.set(false);
  return null;
});
Object obj = completableFuture.get();
Object objV2 = completableFutureV2.get();
if (flag.get() == false){
  throw new RuntimeException("xxx异常");
}
相关文章
|
6月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
73 2
|
6月前
|
并行计算 Java
异步编程好帮手之CompletableFuture详解
异步编程好帮手之CompletableFuture详解
79 0
|
29天前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
6月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
Java
异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析
异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析
62 0
|
Java
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture
69 0
|
消息中间件 存储 Java
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
Java 8 的异步利器:CompletableFuture源码解析(建议精读)
实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage 接口去支持完成时触发的函数和操作。
|
Java API 调度
【并发编程】异步编程CompletableFuture实战
【并发编程】异步编程CompletableFuture实战
【并发编程】异步编程CompletableFuture实战