【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(下)
Demo:


JDK8的CompletableFuture 自带多任务组合方法allOf和anyOf


  • allOf是等待所有任务完成,构造后CompletableFuture完成


  • anyOf是只要有一个任务完成,构造后CompletableFuture就完成


    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }


方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式一:循环创建CompletableFuture list, 然后组装 组装返回一个有返回值的CompletableFuture,返回结果get()获取
            for (int i = 0; i < taskList.size(); i++) {
                final int j = i;
                //异步执行  拿到每个有返回值的CompletableFuture对象
                CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(taskList.get(j)), exs)
                        //Integer转换字符串    thenAccept只接受不返回不影响结果
                        .thenApply(e -> Integer.toString(e))
                        //如需获取任务完成先后顺序,此处代码即可
                        .whenComplete((v, e) -> {
                            System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                            list2.add(v);
                        });
                futureList.add(future);
            }
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            //1.构造一个空CompletableFuture,子任务数为入参任务list size
            CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.stream()
                    .filter(f -> f != null).collect(toList()).toArray(new CompletableFuture[futureList.size()]));
            //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
            list = allDoneFuture.thenApply(v -> futureList.stream().map(CompletableFuture::join).collect(toList())).get();
            //流式获取结果:此处是根据任务添加顺序获取的结果========================
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:27:24 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:27:24 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:27:24 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:27:24 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:27:24 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:27:24 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:27:24 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:27:24 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:27:24 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:27:26 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:27:26 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:27:28 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:27:28 CST 2018
任务完成先后顺序,结果list2=[2, 4, 3, 6, 8, 7, 10, 9, 1, 5];任务提交顺序,结果list=[2, 1, 3, 4, 5, 6, 7, 8, 9, 10],耗时=5141


方式二:全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。—》推荐


/**
 * 多线程并发任务,取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:53
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<String> list = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        try {
            //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture<Integer>[] cfs = taskList.stream().map(i ->
                    //把计算任务 交给CompletableFuture异步去处理执行
                    CompletableFuture.supplyAsync(() -> calc(i), exs)
                            // 把计算完成结果做Function处理:此处是转换成了字符串
                            .thenApply(h -> Integer.toString(h))
                            //如需获取任务完成先后顺序,此处代码即可  会先处理先完成的任务 后处理后完成的任务 使用起来比CompletionService确实方便不少
                            .whenComplete((v, e) -> {
                                System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                                list2.add(v);
                            })).toArray(CompletableFuture[]::new); //此处直接toArray 不toList了
            //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取  此处使用join来获取结果
            CompletableFuture.allOf(cfs).join();
            System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    //模拟任务的耗时方法
    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                //任务1耗时3秒
                Thread.sleep(3000);
            } else if (i == 5) {
                //任务5耗时5秒
                Thread.sleep(5000);
            } else {
                //其它任务耗时1秒
                Thread.sleep(1000);
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}


输出:


task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:19:41 CST 2018
任务4完成!result=4,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:19:41 CST 2018
任务8完成!result=8,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:19:41 CST 2018
任务2完成!result=2,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务10完成!result=10,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务3完成!result=3,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务7完成!result=7,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:19:41 CST 2018
任务9完成!result=9,异常 e=null,Wed Oct 31 12:19:41 CST 2018
任务6完成!result=6,异常 e=null,Wed Oct 31 12:19:41 CST 2018
task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:19:43 CST 2018
任务1完成!result=1,异常 e=null,Wed Oct 31 12:19:43 CST 2018
task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:19:45 CST 2018
任务5完成!result=5,异常 e=null,Wed Oct 31 12:19:45 CST 2018
任务完成先后顺序,结果list2=[4, 8, 2, 10, 3, 7, 9, 6, 1, 5];任务提交顺序,结果list=[],耗时=5166  ---》符合逻辑,10个任务,10个线程并发执行,其中任务1耗时3秒,任务5耗时5秒,耗时取最大值。


建议:CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。


总结


本文从原理、demo、建议三个方向分析了常用多线程并发,取结果归集的几种实现方案,希望对大家有所启发,整理表格如下:



image.png

相关文章
|
2天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
2天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
2天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
13 3
|
2天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
19 2
|
10天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
40 6
|
2天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
12 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
60 1
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
32 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
25 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
41 2