【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(上)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(上)

前文


开启线程执行任务,不管是使用Runnable(无返回值不支持上报异常)还是Callable(有返回值支持上报异常)接口,都可以轻松实现。那么如果是开启线程池并需要获取结果归集的情况下,如何实现,以及优劣?


本文将分别以这四种方式解决归集的问题,然后看看效率和使用的方便程度即可


1、Futrue


Future接口封装了取消,获取线程结果,以及状态判断是否取消,是否完成这几个方法,都很有用。


Demo:


使用线程池提交Callable接口任务,返回Future接口,添加进list,最后遍历该List且内部使用while轮询,并发获取结果,代码如下


/**
 * 使用Futrue来实现多线程执行归集操作
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:02
 */
public class FutureDemo {
    public static void main(String[] args) {
        Long start = Instant.now().toEpochMilli();
        //定义一个线程池 方便开启和执行多线程 此处为了方便,直接使用 newFixedThreadPool
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集 装载在list里面
        List<Integer> list = new ArrayList<>();
        List<Future<Integer>> futureList = new ArrayList<>();
        try {
            //1.高速提交10个任务,每个任务返回一个Future入futureList 装载起来  这样10个线程就并行去处理和计算了
            for (int i = 0; i < 10; i++) {
                futureList.add(exs.submit(new CallableTask(i + 1)));
            }
            Long getResultStart = Instant.now().toEpochMilli();
            System.out.println("结果归集开始时间=" + LocalDateTime.now());
            //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<Future<Integer>> iterable = futureList.iterator();
                //遍历 轮询
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    //如果任务完成就立马取结果,并且,并且把该任务直接从futureList移除掉 否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        Integer i = future.get();
                        System.out.println("任务i=" + i + "获取完成,移出任务队列!" + LocalDateTime.now());
                        //把结果装入进去 然后把futrue任务移除
                        list.add(i);
                        iterable.remove();
                    } else {
                        Thread.sleep(1);//避免CPU高速运转(这就是轮询的弊端),这里休息1毫秒,CPU纳秒级别
                    }
                }
            }
            System.out.println("list=" + list); //任务的处理结果
            System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    // 任务 采用sleep模拟处理任务需要消耗的时间
    static class CallableTask implements Callable<Integer> {
        Integer i; //用来编号任务  方便日志里输出识别
        public CallableTask(Integer i) {
            super();
            this.i = i;
        }
        @Override
        public Integer call() throws Exception {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + LocalDateTime.now());
            return i;
        }
    }
}


如上图,开启定长为10的线程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任务1耗时3秒,任务5耗时5秒,其他1秒。控制台打印如下:


结果归集开始时间=2018-10-31T11:01:19.457
task线程:pool-1-thread-2任务i=2,完成!2018-10-31T11:01:19.976
task线程:pool-1-thread-4任务i=4,完成!2018-10-31T11:01:19.977
task线程:pool-1-thread-3任务i=3,完成!2018-10-31T11:01:19.977
任务i=4获取完成,移出任务队列!2018-10-31T11:01:19.978
task线程:pool-1-thread-9任务i=9,完成!2018-10-31T11:01:19.978
task线程:pool-1-thread-8任务i=8,完成!2018-10-31T11:01:19.978
task线程:pool-1-thread-7任务i=7,完成!2018-10-31T11:01:19.978
task线程:pool-1-thread-6任务i=6,完成!2018-10-31T11:01:19.978
任务i=6获取完成,移出任务队列!2018-10-31T11:01:19.979
任务i=7获取完成,移出任务队列!2018-10-31T11:01:19.979
task线程:pool-1-thread-10任务i=10,完成!2018-10-31T11:01:19.979
任务i=8获取完成,移出任务队列!2018-10-31T11:01:19.979
任务i=9获取完成,移出任务队列!2018-10-31T11:01:19.979
任务i=10获取完成,移出任务队列!2018-10-31T11:01:19.979
任务i=2获取完成,移出任务队列!2018-10-31T11:01:19.980
任务i=3获取完成,移出任务队列!2018-10-31T11:01:19.980
task线程:pool-1-thread-1任务i=1,完成!2018-10-31T11:01:21.964
任务i=1获取完成,移出任务队列!2018-10-31T11:01:21.965
task线程:pool-1-thread-5任务i=5,完成!2018-10-31T11:01:23.977
任务i=5获取完成,移出任务队列!2018-10-31T11:01:23.979
list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]
总耗时=5070,取结果归集耗时=5037


看最后的两个结果输出:


list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]--》多执行几遍,最后2个总是1,5最后加进去的,可实现按照任务完成先后顺序获取结果! 因为1需要3s,5需要5s是最慢的,所以最后进入list
总耗时=5046,取结果归集耗时=5040 ---》符合逻辑,10个任务,定长10线程池,其中一个任务耗时3秒,一个任务耗时5秒,由于并发高速轮训,耗时取最长5秒


建议:此种方法可实现基本目标,任务并行且按照完成顺序获取结果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用!

2、FutureTask


FutureTask是接口RunnableFuture的唯一实现类(实现了Future+Runnable).

1.Runnable接口,可开启单个线程执行。

2.Future接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。


demo:


demo1:两个步骤:1.开启单个线程执行任务,2.阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑

/**
 * FutureTask弥补了Future必须用线程池提交返回Future的缺陷,实现功能如下:
 * 这两个步骤:一个开启线程执行任务,一个阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑。
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:15
 */
public class FutureTaskContorlDemo {
    public static void main(String[] args) {
        try {
            System.out.println("=====例如一个统计公司总部和分部的总利润是否达标100万==========");
            //利润 记录总公司的利润综合
            Integer count = 0;
            //1.定义一个futureTask,假设去远程http获取各个分公司业绩(任务都比较耗时).
            FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask());
            Thread futureTaskThread = new Thread(futureTask);
            futureTaskThread.start();
            System.out.println("futureTaskThread start!" + new Date());
            //2.主线程先做点别的事
            System.out.println("主线程查询总部公司利润开始时间:" + new Date());
            Thread.sleep(5000);
            count += 10; //10表示北京集团总部利润。
            System.out.println("主线程查询总部公司利润结果时间:" + new Date());
            //总部已达标100万利润,就不再继续执行获取分公司业绩任务了
            if (count >= 100) {
                System.out.println("总部公司利润达标,取消futureTask!" + new Date());
                futureTask.cancel(true);//不需要再去获取结果,那么直接取消即可
            } else {
                System.out.println("总部公司利润未达标,进入阻塞查询分公司利润!" + new Date());
                //3总部未达标.阻塞获取,各个分公司结果  然后分别去获取分公司的利润
                Integer i = futureTask.get();//真正执行CallableTask
                System.out.println("i=" + i + "获取到结果!" + new Date());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 模拟一个十分耗时的任务  去所有的分公司里去获取利润结果
    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("CallableTask-call,查询分公司利润,执行开始!" + new Date());
            Thread.sleep(10000);
            System.out.println("CallableTask-call,查询分公司利润,执行完毕!" + new Date());
            return 10;
        }
    }
}


FutureTask这个任务,是Thread.start()的时候就开始执行了的。而结果是.get()的时候才会给你(如果提前完成,结果也会先给你缓存在对象内喽。否则get就会阻塞直到有结果了)


注意:倘若你的任务里抛出了异常。那么get方法就会报错从而中断主线程(相当于不需要返回值的异步执行嘛~),但是但是但是,如果你不调用get方法,主线程是不会中断的。


输出:


=====例如一个统计公司总部和分部的总利润是否达标100万==========
futureTaskThread start!Wed Oct 31 11:21:33 CST 2018
主线程查询总部公司利润开始时间:Wed Oct 31 11:21:33 CST 2018
CallableTask-call,查询分公司利润,执行开始!Wed Oct 31 11:21:33 CST 2018
主线程查询总部公司利润结果时间:Wed Oct 31 11:21:38 CST 2018
总部公司利润未达标,进入阻塞查询分公司利润!Wed Oct 31 11:21:38 CST 2018
CallableTask-call,查询分公司利润,执行完毕!Wed Oct 31 11:21:43 CST 2018
i=10获取到结果!Wed Oct 31 11:21:43 CST 2018


如上,分离之后,futureTaskThread耗时10秒期间,主线程还穿插的执行了耗时5秒的操作,大大减小总耗时。且可根据业务逻辑实时判断是否需要继续执行futureTask。


Demo2:FutureTask一样可以并发执行任务并获取结果,如下:


/**
 * FutureTask实现多线程并发执行任务并取结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:26
 */
public class FutureTaskDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<Integer> list = new ArrayList<>();
        List<FutureTask<Integer>> futureList = new ArrayList<>();
        try {
            //启动线程池  和上面Futrue对比,只有这块有点不一样
            for (int i = 0; i < 10; i++) {
                FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask(i + 1));
                //提交任务,添加返回,Runnable特性
                exs.submit(futureTask);
                //Future特性 提交任务后  把futureTask添加进futureList
                futureList.add(futureTask);
            }
            Long getResultStart = System.currentTimeMillis();
            System.out.println("结果归集开始时间=" + new Date());
            //结果归集
            while (futureList.size() > 0) {
                Iterator<FutureTask<Integer>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    if (future.isDone() && !future.isCancelled()) {
                        //Future特性
                        Integer i = future.get();
                        System.out.println("任务i=" + i + "获取完成,移出任务队列!" + new Date());
                        list.add(i);
                        //任务完成移除任务
                        iterable.remove();
                    } else {
                        //避免CPU高速轮循,可以休息一下。
                        Thread.sleep(1);
                    }
                }
            }
            System.out.println("list=" + list);
            System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    static class CallableTask implements Callable<Integer> {
        Integer i;
        public CallableTask(Integer i) {
            super();
            this.i = i;
        }
        @Override
        public Integer call() throws Exception {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + new Date());
            return i;
        }
    }
}


输出:


结果归集开始时间=Wed Oct 31 11:26:41 CST 2018
task线程:pool-1-thread-8任务i=8,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-7任务i=7,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-6任务i=6,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-4任务i=4,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-3任务i=3,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-2任务i=2,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-10任务i=10,完成!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-9任务i=9,完成!Wed Oct 31 11:26:42 CST 2018
任务i=8获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=9获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=10获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=2获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=3获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=4获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=6获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
任务i=7获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018
task线程:pool-1-thread-1任务i=1,完成!Wed Oct 31 11:26:44 CST 2018
任务i=1获取完成,移出任务队列!Wed Oct 31 11:26:44 CST 2018
task线程:pool-1-thread-5任务i=5,完成!Wed Oct 31 11:26:46 CST 2018
任务i=5获取完成,移出任务队列!Wed Oct 31 11:26:46 CST 2018
list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5]
总耗时=5066,取结果归集耗时=5058


建议:


demo1在特定场合例如有十分耗时的业务但有依赖于其他业务不一定非要执行的,可以尝试使用。


demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使用。

相关文章
|
8天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
28 9
|
11天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
8天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
11天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
25 3
|
9天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
10天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
22 1
|
11天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
26天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
19 3
|
26天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2