【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)
Demo:


JDK8的CompletableFuture 自带多任务组合方法allOf和anyOf


  • allOf是等待所有任务完成,构造后CompletableFuture完成


  • anyOf是只要有一个任务完成,构造后CompletableFuture就完成


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }


方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式一:循环创建CompletableFuture list, 然后组装 组装返回一个有返回值的CompletableFuture,返回结果get()获取
            for (int i = 0; i < taskList.size(); i++) {
                final int j = i;
                //异步执行  拿到每个有返回值的CompletableFuture对象
                CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(taskList.get(j)), exs)
                        //Integer转换字符串    thenAccept只接受不返回不影响结果
                        .thenApply(e -> Integer.toString(e))
                        //如需获取任务完成先后顺序,此处代码即可
                        .whenComplete((v, e) -> {
                            System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                            list2.add(v);
                        });
                futureList.add(future);
            }
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            //1.构造一个空CompletableFuture,子任务数为入参任务list size
            CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.stream()
                    .filter(f -> f != null).collect(toList()).toArray(new CompletableFuture[futureList.size()]));
            //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
            list = allDoneFuture.thenApply(v -> futureList.stream().map(CompletableFuture::join).collect(toList())).get();
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:27:24 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:27:24 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:27:24 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:27:24 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:27:24 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:27:24 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:27:24 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:27:24 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:27:26 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:27:26 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:27:28 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:27:28 CST 2018
任务完成先后顺序,结果list2=[2, 4, 3, 6, 8, 7, 10, 9, 1, 5];任务提交顺序,结果list=[2, 1, 3, 4, 5, 6, 7, 8, 9, 10],耗时=5141


方式二:全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。—》推荐


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture<Integer>[] cfs = taskList.stream().map(i ->
                    //把计算任务 交给CompletableFuture异步去处理执行
                    CompletableFuture.supplyAsync(() -> calc(i), exs)
                            // 把计算完成结果做Function处理:此处是转换成了字符串
                            .thenApply(h -> Integer.toString(h))
                            //如需获取任务完成先后顺序,此处代码即可  会先处理先完成的任务 后处理后完成的任务 使用起来比CompletionService确实方便不少
                            .whenComplete((v, e) -> {
                                System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                                list2.add(v);
                            })).toArray(CompletableFuture[]::new); //此处直接toArray 不toList了
            //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取  此处使用join来获取结果
            CompletableFuture.allOf(cfs).join();
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:19:41 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:19:41 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:19:41 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:19:41 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:19:43 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:19:43 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:19:45 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:19:45 CST 2018
任务完成先后顺序,结果list2=[4, 8, 2, 10, 3, 7, 9, 6, 1, 5];任务提交顺序,结果list=[],耗时=5166  ---》符合逻辑,10个任务,10个线程并发执行,其中任务1耗时3秒,任务5耗时5秒,耗时取最大值。


建议:CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。


总结


本文从原理、demo、建议三个方向分析了常用多线程并发,取结果归集的几种实现方案,希望对大家有所启发,整理表格如下:



image.png

相关文章
|
2月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
282 83
|
2月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
230 83
|
5月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
4月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
183 0
|
4月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
301 0
|
3月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
108 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
2月前
|
SQL 缓存 安全
深度理解 Java 内存模型:从并发基石到实践应用
本文深入解析 Java 内存模型(JMM),涵盖其在并发编程中的核心作用与实践应用。内容包括 JMM 解决的可见性、原子性和有序性问题,线程与内存的交互机制,volatile、synchronized 和 happens-before 等关键机制的使用,以及在单例模式、线程通信等场景中的实战案例。同时,还介绍了常见并发 Bug 的排查与解决方案,帮助开发者写出高效、线程安全的 Java 程序。
140 0
|
4月前
|
存储 缓存 安全
JUC并发—11.线程池源码分析
本文主要介绍了线程池的优势和JUC提供的线程池、ThreadPoolExecutor和Excutors创建的线程池、如何设计一个线程池、ThreadPoolExecutor线程池的执行流程、ThreadPoolExecutor的源码分析、如何合理设置线程池参数 + 定制线程池。
JUC并发—11.线程池源码分析
|
3月前
|
人工智能 Java API
Java并发编程之Future与FutureTask
本文深入解析了Future接口及其实现类FutureTask的原理与使用。Future接口定义了获取任务结果、取消任务及查询任务状态的规范,而FutureTask作为其核心实现类,结合了Runnable与Future的功能。文章通过分析FutureTask的成员变量、状态流转、关键方法(如run、set、get、cancel等)的源码,展示了异步任务的执行与结果处理机制。最后,通过示例代码演示了FutureTask的简单用法,帮助读者更直观地理解其工作原理。适合希望深入了解Java异步编程机制的开发者阅读。