CountDownLatch翻车后,大家都建议我用CompletableFuture改造下,改造完感觉真香啊!

简介: 前段时间使用了CountDownLatch来做并发流程的控制,在生产上碰到了一些问题,最终问题是解决了,但是那篇文章的评论大家让我用CompletableFuture来试一试,改造完之后,发现CompletableFuture这东西真强大,有种相见恨晚的感觉。

前言

大家好,我是小郭,前段时间使用了CountDownLatch来做并发流程的控制,在生产上碰到了一些问题,最终问题是解决了,但是那篇文章的评论大家让我用CompletableFuture来试一试,改造完之后,发现CompletableFuture这东西真强大,有种相见恨晚的感觉。

上篇文章

# 以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了

可以来这篇文章看一下具体的业务场景

CompletableFuture改造

我先直接分享一下我是如何使用CompletableFuture的吧

// 下载文件总数,初始化
List<Integer> resultList = new ArrayList<>(1000);
ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
IntStream.range(0,1000).forEach(resultList::add);
public List<R> sendAsyncBatch(List<P> list, Executor executor, TaskLoader<R,P> loader) {
    List<R> resultList = Collections.synchronizedList(Lists.newArrayList());
    if (CollectionUtils.isNotEmpty(list)) {
        Executor finalExecutor = executor;
        // 将任务拆分分成每50个为一个任务
        CollUtil.split(list, 50)
                .forEach(tempList -> {
                    CompletableFuture[] completableFutures = tempList.stream()
                            .map(p -> CompletableFuture.supplyAsync(() -> {
                                try {
                                    return loader.load(p);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                return null;
                            }, finalExecutor)
                                    .handle((result, throwable) -> {
                                        if (Objects.nonNull(throwable)) {
                                            //log.error("async error:{}", throwable.getMessage());
                                        } else if (Objects.nonNull(result)) {
                                            //log.info("async success:{}", result);
                                        } else {
                                            //log.error("async result is null");
                                        }
                                        return result;
                                    }).whenComplete((r, ex) -> {
                                        if (Objects.nonNull(r)) {
                                            resultList.add((R) r);
                                        }
                                    })
                            ).toArray(CompletableFuture[]::new);
                    CompletableFuture.allOf(completableFutures).join();
                    System.out.println(resultList.size());
                });
    }
    return resultList;
}
// 具体业务逻辑实现接口
@FunctionalInterface
public interface TaskLoader<T,P> {
    T load(P p) throws InterruptedException;
}
//  自定义启动器
ExecutorService executorService = BaseThreadPoolExector.queueExecutor(new ArrayBlockingQueue<>(500));
AsyncTask<Integer, Integer> asyncTask = new AsyncTask();
// 返回结果
List<Integer> list = asyncTask.sendAsyncBatch(resultList, executorService, new TaskLoadImpl());
// 返回结果处理

我先说一下,为什么要CountDownLatch替换掉

  1. CompletableFuture为我们提供更直观、更优美的API。
  2. 在“多个任务等待完成状态”这个应用场景,在遇到异常的情况下我们不需要去手动的抛异常,以免错误处理细节导致阻塞
  3. CompletableFuture也可以定制执行器

但是他也是有缺点的,我个人感觉他的API有点多,看的时候让人眼花。

短短十几行的代码,看到了很多API supplyAsync、handle、whenComplete、allOf

之后我们还会用到runAsync、 thenApply、thenCompose等等其他的。

什么是CompletableFuture?

异步编程,利用多线程优化性能这个核心方案得以实施的基础

他的目的也很简单,同一个CPU上执行几个松耦合的任务,充分利用CPU核数,实现最大化吞吐量,避免因为阻塞造成等待时间过长;

1. 要区分并发与并行的区别

我们还需要特别的注意这两个概念不能混淆

并发:在一个CPU上串行执行

并行:多个CPU上同时执行任务

2. Future接口

CompletableFuture主要继承了Future接口,但是他比Future接口丰富的很多

// 取消
boolean cancel(boolean mayInterruptIfRunning);
// 判断是否取消
boolean isCancelled();
//是否异步计算是否已经结束
boolean isDone();
// 获取计算结果
V get() throws InterruptedException, ExecutionException;
// 设置最长计算时间,返回计算结果
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

网络异常,图片无法展示
|

网络异常,图片无法展示
|

可以看到Future接口的局限性,主要是用起来不省事 举个例子:A线程执行完之后通知B线程执行

ExecutorService executorService = BaseThreadPoolExector.calculateExecutor();
Future<String> futureA = executorService.submit(() -> Thread.currentThread().getName());
System.out.println(futureA.get());
if (futureA.isDone()){
    Future<String> futureB = executorService.submit(() -> Thread.currentThread().getName());
    System.out.println(futureB.get());
}
executorService.shutdown();

这里我们就需要查询futureA.isDone()结果,然后再去执行B线程的业务

而 CompletableFuture 操作起来就便捷很多了

CompletableFuture<String> completableFuture = CompletableFuture
        .supplyAsync(() -> Thread.currentThread().getName(), executorService)
        .thenApply(s -> Thread.currentThread().getName());
System.out.println(completableFuture.get());
准备执行
计划执行
supplyAsync result pool-1-thread-1, thenApply result main
线程退出

supplyAsync执行完成之后,再去执行thenApply

没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;

3. 错误处理细节,避免造成阻塞

CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() ->{
    try {
        completableFuture.complete(10/0);
    }catch (Exception ex){
        //ex.printStackTrace();
        completableFuture.completeExceptionally(ex);
    }
}).start();
try {
    System.out.println(completableFuture.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

注意到catch里面的completeExceptionally函数了吧,

这个主要的作用就是为了抛出异常,

如果缺少了他,就会造成completableFuture.get()一直处于等待造成阻塞,

与此同时,没有为我们抛出异常信息。

所以CompletableFuture的API优美之处又要体现出来了

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    int kk = 10 / 0;
    return kk;
}).handle((result, throwable) -> {
    System.out.println(result);
    System.out.println(throwable.getMessage());
    return result;
}).whenComplete((result ,throwable) -> System.out.println(result));

supplyAsync配合着 handle 和 whenComplete,将异常和结果进行处理.

handle 和 whenComplete的区别

whenComplete
public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
handle
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

whenComplete是BiConsumer也就是直接消费不返回值,不对结果产生影响

如果单独使用whenComplete的时候,没有进行抛出异常的处理会造成阻塞

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            int kk = 10 / 0;
            return kk;
        })
                .whenComplete((r, ex) -> {
                    if (Objects.nonNull(ex)) {
                        System.out.println("whenComplete>>>" + ex.getMessage());
                    }
                })
                .exceptionally(throwable -> {
                    System.out.println("exceptionally>>>" + throwable.getMessage());
                    return null;
                });

handle是BiFunction也就是需要返回值,对结果产生影响

需要注意的是,在handle中对结果修改,要避免结果对象为空,如果没有判断直接进行操作会出现空指针异常造成阻塞

在这里出现空指针异常,如果没有exceptionally将异常抛出,则会造成阻塞

了解API

欲善其功,必先利其器

我们主要从这三种关系下手去了解和使用API 涉及接口

CompletionStage<R> thenApply(fn);   
CompletionStage<R> thenApplyAsync(fn);           
CompletionStage<Void> thenAccept(consumer);      
CompletionStage<Void> thenAcceptAsync(consumer); 
CompletionStage<Void> thenRun(action);           
CompletionStage<Void> thenRunAsync(action);      
CompletionStage<R> thenCompose(fn);              
CompletionStage<R> thenComposeAsync(fn);

thenApply函数里参数入参Function<? super T,? extends U> fn,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply函数出参的是CompletionStage<R>

thenAccept类型函数入参Consumer<? super T> action是一个消费类型的,回参是CompletionStage<Void>所以thenAccept类型函数不会有返回值。

thenRun函数入参Runnable action,回参CompletionStage<Void>,所以既不能接收参数也不支持返回值。

thenCombine函数入参CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,回参CompletableFuture<V>是支持返回值的,他的作用主要使用BiFunction处理两个阶段的结果

我们只需要注意他的入参、回参和函数后缀就能够区分出他们的不同

1. CompletableFuture中的串行化关系

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{
    //int kk = 10/0;
    return Thread.currentThread().getName() + ":小郭";
},executorService).thenApply(s -> {
    return s + "拿茶叶";
}).thenApply(a ->{
    return a + ",泡茶去";
}).handle((result, ex) ->{
    if (ex != null){
        System.out.println(ex.getMessage());
    }
    return result;
}).whenComplete((r, ex) ->{
    System.out.println(r);
});
task1.join();

执行结果:

准备执行
计划执行
pool-1-thread-1:小郭拿茶叶,泡茶去

可以看到,是按照之上而下的顺序去执行的supplyAsync、thenApply、thenApply 如果第二阶段任务没有拿到第一阶段的结果,他就会等待

2. CompletableFuture中的汇聚AND关系

CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
    int t = new Random().nextInt(30);
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("task1=" + t);
    return t;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
    int t = new Random().nextInt(30);
    try {
        Thread.sleep(t);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("task2=" + t);
    return t;
});
CompletableFuture<Integer> task3 = task1.thenCombineAsync(task2, Integer::sum);
task3.join();

等待task1和task2执行完成,task再进行处理

执行结果

task1=1
task2=3
4

3. CompletableFuture中的汇聚OR关系

CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{
    int t = new Random().nextInt(5);
    try {
        Thread.sleep(t * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("task1=" + t);
    return t;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{
    int t = new Random().nextInt(5);
    try {
        Thread.sleep(t * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("task2=" + t);
    return t;
});
CompletableFuture<Integer> task3 = task1.applyToEither(task2, s ->s);
task3.join();

谁先执行完先输出谁,如果相同时间执行完,则一起数据

执行结果

我快我先来 task2=2
我快我先来 task1=2
2
我快我先来 task2=0
0

实现List任务并行执行的方式

  1. 并行流进行操作
  2. 使用CompletableFuture发起异步请求,最后使用join等待所有异步操作结束

为了更好的发挥出CompletableFuture,需要采用定制的执行器

那这两个如何选择?

  1. 进行计算密集型,并且没有I/O操作,推荐使用Sream并行流,没必要创建更多的线程,线程过多反而是一种浪费
  2. 涉及I/O等待的操作,CompletableFuture的灵活性会更高

现在回过头看一下,我上面的改造方法,是不是就感觉清晰了许多,不足的地方大家提出来

总结

  1. 这篇文章我主要是根据大家的建议,使用了Java8的CompletableFuture 来进行了原来的业务功能改造.
  2. 在执行比较耗时的业务操作时候可以使用异步编程来提高性能,加快程序的处理速度
  3. 在处理异常机制的时候,往往是让我们很头痛的,担心线程中出现的异常没有及时捕获,造成程序的阻塞或者其他方面的影响,CompletableFuture 提供了优秀的异常管理机制。
  4. CompletableFuture 还提供了 串行、聚合、优先输出的函数,更贴切业务需求做出最好的选择。
相关文章
|
5月前
|
存储 Java 索引
(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析下篇
在《(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇》中,我们曾初步了解了ForkJoin分支合并框架的使用,也分析框架的成员构成以及任务提交和创建工作的原理实现,在本篇则会对框架的任务执行、任务扫描、线程挂起、结果合并以及任务窃取的源码实现进行分析。
|
5月前
|
存储 监控 Java
(十一)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇
在上篇文章《深入理解并发之Java线程池、工作原理、复用原理及源码分析》中,曾详细谈到了Java的线程池框架。在其中也说到了JDK提供的四种原生线程池以及自定义线程池,而本文则再来详细谈谈JDK1.7中新推出的线程池:ForkJoinPool。
|
设计模式 存储 安全
Java的第十三篇文章——JAVA多线程(后期再学一遍)
Java的第十三篇文章——JAVA多线程(后期再学一遍)
|
消息中间件 存储 Java
一网打尽异步神器CompletableFuture
最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
|
设计模式 SQL Java
有点狠有点猛,我用责任链模式重构了业务代码
文章开篇,抛出一个老生常谈的问题,学习设计模式有什么作用? 设计模式主要是为了应对代码的复杂性,让其满足开闭原则,提高代码的扩展性 另外,学习的设计模式 一定要在业务代码中落实,只有理论没有真正实施,是无法真正掌握并且灵活运用设计模式的 这篇文章主要说 责任链设计模式,认识此模式是在读 Mybatis 源码时, Interceptor 拦截器主要使用的就是责任链,当时读过后就留下了很深的印象(内心 OS:还能这样玩)
|
消息中间件 JavaScript 小程序
以为很熟悉 CountDownLatch,万万没想到在生产环境翻车了.....
以为很熟悉 CountDownLatch,万万没想到在生产环境翻车了.....
|
消息中间件 JavaScript 小程序
一网打尽:异步神器 CompletableFuture 万字详解!
一网打尽:异步神器 CompletableFuture 万字详解!
|
Java 数据库
以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了
我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。
JUC之FutureTask源码深度剖析 ✨ 每日积累
JUC之FutureTask源码深度剖析 ✨ 每日积累
JUC之FutureTask源码深度剖析 ✨ 每日积累
|
存储 安全 Java
阿里巴巴面试题- - -多线程&并发篇(三十四)
阿里巴巴面试题- - -多线程&并发篇(三十四)