【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(中)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(中)

3、CompletionService


如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢?

Java8之前的做法是让返回Futrue,然后调用其get阻塞方法即可。这样做固然可以,但却相当乏味。幸运的是,Java8提供了一个更好的方法:完成服务 (CompletionService)。


CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个使用BlockingQueue打包的Future。


CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。


原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。


/**
 * CompletionService多线程并发任务结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:29
 */
public class CompletionServiceDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //开启10个线程
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<Integer> list = new ArrayList<>();
        List<Future<Integer>> futureList = new ArrayList<>();
        try {
            int taskCount = 10;
            //1.定义CompletionService ExecutorCompletionService是此接口的唯一实现类 需要把线程池传进去
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            //2.添加任务(向CompletionService添加任务 然后把返回的futrue添加到futureList即可)
            for (int i = 0; i < taskCount; i++) {
                futureList.add(completionService.submit(new Task(i + 1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果  (若是按照提交顺序,那和Futrue的Demo结果将一样,没啥优势可言)
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }
//            //方法2.使用内部阻塞队列的take():内部维护阻塞队列,任务先完成的先获取到
            for (int i = 0; i < taskCount; i++) {
                Integer result = completionService.take().get();//采用completionService.take(),
                System.out.println("任务i==" + result + "完成!" + new Date());
                list.add(result);
            }
            System.out.println("list=" + list);
            System.out.println("总耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }
    }
    static class Task implements Callable<Integer> {
        Integer i;
        public Task(Integer i) {
            super();
            this.i = i;
        }
        @Override
        public Integer call() throws Exception {
            if (i == 5) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!");
            return i;
        }
    }
}


输出:


线程:pool-1-thread-4任务i=4,执行完成!
任务i==4完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-3任务i=3,执行完成!
任务i==3完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-2任务i=2,执行完成!
任务i==2完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-1任务i=1,执行完成!
任务i==1完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-8任务i=8,执行完成!
任务i==8完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-7任务i=7,执行完成!
任务i==7完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-6任务i=6,执行完成!
任务i==6完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-10任务i=10,执行完成!
任务i==10完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-9任务i=9,执行完成!
任务i==9完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-5任务i=5,执行完成!
任务i==5完成!Wed Oct 31 11:33:39 CST 2018
list=[4, 3, 2, 1, 8, 7, 6, 10, 9, 5] ---》这里证实了确实按照执行完成顺序排序
总耗时=5019 ---》符合逻辑,10个任务,定长5线程池执行,取最长时间。


建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。


优势:能够按照任务完成时间排序,所有有排序需求的,可以考虑使用它。这也是JDK8以前最佳选择

4、CompletableFuture


JDK1.8才新加入的一个实现类,实现了Future, CompletionStage2个接口(CompletionStage接口也是1.8才提供的)


CompletableFuture的简单介绍::


当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。


CompletableFuture实现了CompletionStage接口的如下策略:


1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作


2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例


3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖


CompletableFuture实现了Futurre接口的如下策略:


       CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。


       以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException


CompletionStage接口实现流式编程:


JDK8新增接口,此接口包含38个方法…是的,你没看错,就是38个方法。这些方法主要是为了支持函数式编程中流式处理。


CompletableFuture中4个异步执行任务静态方法:


   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }


如上图,其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。


组合CompletableFuture:


thenCombine(): 先完成当前CompletionStage和other 2个CompletionStage任务,然后把结果传参给BiFunction进行结果合并操作


三个重载方法如下:


    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }


thenCompose():第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。

三个重载方法如下:


    public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

…]… …等等类似的重载方法有很多,后续会专门开博文讲述它的使用方式,请持续关注我的博客

【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)

相关文章
|
2月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
125 0
|
2月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
282 83
|
3月前
|
存储 SQL 安全
Java 无锁方式实现高性能线程实战操作指南
本文深入探讨了现代高并发Java应用中单例模式的实现方式,分析了传统单例(如DCL)的局限性,并提出了多种无锁实现方案。包括基于ThreadLocal的延迟初始化、VarHandle原子操作、Record不可变对象、响应式编程(Reactor)以及CDI依赖注入等实现方式。每种方案均附有代码示例及适用场景,同时通过JMH性能测试对比各实现的优劣。最后,结合实际案例设计了一个高性能配置中心,展示了无锁单例在实际开发中的应用。总结中提出根据场景选择合适的实现方式,并遵循现代单例设计原则以优化性能和安全性。文中还提供了代码获取链接,便于读者实践与学习。
93 0
|
2月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
230 83
|
3月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
108 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
2月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
140 0
|
3月前
|
存储 Java
说一说 JAVA 内存模型与线程
我是小假 期待与你的下一次相遇 ~
|
3月前
|
移动开发 Java
说一说 Java 是如何实现线程间通信
我是小假 期待与你的下一次相遇 ~