【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)
Demo:


JDK8的CompletableFuture 自带多任务组合方法allOf和anyOf


  • allOf是等待所有任务完成,构造后CompletableFuture完成


  • anyOf是只要有一个任务完成,构造后CompletableFuture就完成


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }


方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式一:循环创建CompletableFuture list, 然后组装 组装返回一个有返回值的CompletableFuture,返回结果get()获取
            for (int i = 0; i < taskList.size(); i++) {
                final int j = i;
                //异步执行  拿到每个有返回值的CompletableFuture对象
                CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(taskList.get(j)), exs)
                        //Integer转换字符串    thenAccept只接受不返回不影响结果
                        .thenApply(e -> Integer.toString(e))
                        //如需获取任务完成先后顺序,此处代码即可
                        .whenComplete((v, e) -> {
                            System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                            list2.add(v);
                        });
                futureList.add(future);
            }
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            //1.构造一个空CompletableFuture,子任务数为入参任务list size
            CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.stream()
                    .filter(f -> f != null).collect(toList()).toArray(new CompletableFuture[futureList.size()]));
            //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
            list = allDoneFuture.thenApply(v -> futureList.stream().map(CompletableFuture::join).collect(toList())).get();
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:27:24 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:27:24 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:27:24 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:27:24 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:27:24 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:27:24 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:27:24 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:27:24 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:27:26 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:27:26 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:27:28 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:27:28 CST 2018
任务完成先后顺序,结果list2=[2, 4, 3, 6, 8, 7, 10, 9, 1, 5];任务提交顺序,结果list=[2, 1, 3, 4, 5, 6, 7, 8, 9, 10],耗时=5141


方式二:全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。—》推荐


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture<Integer>[] cfs = taskList.stream().map(i ->
                    //把计算任务 交给CompletableFuture异步去处理执行
                    CompletableFuture.supplyAsync(() -> calc(i), exs)
                            // 把计算完成结果做Function处理:此处是转换成了字符串
                            .thenApply(h -> Integer.toString(h))
                            //如需获取任务完成先后顺序,此处代码即可  会先处理先完成的任务 后处理后完成的任务 使用起来比CompletionService确实方便不少
                            .whenComplete((v, e) -> {
                                System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                                list2.add(v);
                            })).toArray(CompletableFuture[]::new); //此处直接toArray 不toList了
            //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取  此处使用join来获取结果
            CompletableFuture.allOf(cfs).join();
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:19:41 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:19:41 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:19:41 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:19:41 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:19:43 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:19:43 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:19:45 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:19:45 CST 2018
任务完成先后顺序,结果list2=[4, 8, 2, 10, 3, 7, 9, 6, 1, 5];任务提交顺序,结果list=[],耗时=5166  ---》符合逻辑,10个任务,10个线程并发执行,其中任务1耗时3秒,任务5耗时5秒,耗时取最大值。


建议:CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。


总结


本文从原理、demo、建议三个方向分析了常用多线程并发,取结果归集的几种实现方案,希望对大家有所启发,整理表格如下:



image.png

相关文章
|
8天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
28 9
|
11天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
8天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
11天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
25 3
|
9天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
10天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
22 1
|
11天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
11天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
39 1
|
3月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
69 1
|
15天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####