【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(中)

简介: 【小家Java】Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比(中)

3、CompletionService


如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果,怎么办呢?

Java8之前的做法是让返回Futrue,然后调用其get阻塞方法即可。这样做固然可以,但却相当乏味。幸运的是,Java8提供了一个更好的方法:完成服务 (CompletionService)。


CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个使用BlockingQueue打包的Future。


CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。


原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。


/**
 * CompletionService多线程并发任务结果归集
 *
 * @author fangshixiang@vipkid.com.cn
 * @description //
 * @date 2018/10/31 11:29
 */
public class CompletionServiceDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //开启10个线程
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //结果集
        List<Integer> list = new ArrayList<>();
        List<Future<Integer>> futureList = new ArrayList<>();
        try {
            int taskCount = 10;
            //1.定义CompletionService ExecutorCompletionService是此接口的唯一实现类 需要把线程池传进去
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            //2.添加任务(向CompletionService添加任务 然后把返回的futrue添加到futureList即可)
            for (int i = 0; i < taskCount; i++) {
                futureList.add(completionService.submit(new Task(i + 1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果  (若是按照提交顺序,那和Futrue的Demo结果将一样,没啥优势可言)
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }
//            //方法2.使用内部阻塞队列的take():内部维护阻塞队列,任务先完成的先获取到
            for (int i = 0; i < taskCount; i++) {
                Integer result = completionService.take().get();//采用completionService.take(),
                System.out.println("任务i==" + result + "完成!" + new Date());
                list.add(result);
            }
            System.out.println("list=" + list);
            System.out.println("总耗时=" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }
    }
    static class Task implements Callable<Integer> {
        Integer i;
        public Task(Integer i) {
            super();
            this.i = i;
        }
        @Override
        public Integer call() throws Exception {
            if (i == 5) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!");
            return i;
        }
    }
}


输出:


线程:pool-1-thread-4任务i=4,执行完成!
任务i==4完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-3任务i=3,执行完成!
任务i==3完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-2任务i=2,执行完成!
任务i==2完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-1任务i=1,执行完成!
任务i==1完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-8任务i=8,执行完成!
任务i==8完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-7任务i=7,执行完成!
任务i==7完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-6任务i=6,执行完成!
任务i==6完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-10任务i=10,执行完成!
任务i==10完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-9任务i=9,执行完成!
任务i==9完成!Wed Oct 31 11:33:35 CST 2018
线程:pool-1-thread-5任务i=5,执行完成!
任务i==5完成!Wed Oct 31 11:33:39 CST 2018
list=[4, 3, 2, 1, 8, 7, 6, 10, 9, 5] ---》这里证实了确实按照执行完成顺序排序
总耗时=5019 ---》符合逻辑,10个任务,定长5线程池执行,取最长时间。


建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。


优势:能够按照任务完成时间排序,所有有排序需求的,可以考虑使用它。这也是JDK8以前最佳选择

4、CompletableFuture


JDK1.8才新加入的一个实现类,实现了Future, CompletionStage2个接口(CompletionStage接口也是1.8才提供的)


CompletableFuture的简单介绍::


当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。


CompletableFuture实现了CompletionStage接口的如下策略:


1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作


2.没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例


3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖


CompletableFuture实现了Futurre接口的如下策略:


       CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任何异常的方式完成。


       以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException


CompletionStage接口实现流式编程:


JDK8新增接口,此接口包含38个方法…是的,你没看错,就是38个方法。这些方法主要是为了支持函数式编程中流式处理。


CompletableFuture中4个异步执行任务静态方法:


   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }


如上图,其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。


组合CompletableFuture:


thenCombine(): 先完成当前CompletionStage和other 2个CompletionStage任务,然后把结果传参给BiFunction进行结果合并操作


三个重载方法如下:


    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }


thenCompose():第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。

三个重载方法如下:


    public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

…]… …等等类似的重载方法有很多,后续会专门开博文讲述它的使用方式,请持续关注我的博客

【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)

相关文章
|
20小时前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
22小时前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
1天前
|
缓存 Java
【Java基础】简说多线程(上)
【Java基础】简说多线程(上)
5 0
|
2天前
|
Java API 调度
[Java并发基础]多进程编程
[Java并发基础]多进程编程
|
2天前
|
并行计算 算法 安全
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
Java从入门到精通:2.1.3深入学习Java核心技术——掌握Java多线程编程
|
2天前
|
安全 Java 编译器
是时候来唠一唠synchronized关键字了,Java多线程的必问考点!
本文简要介绍了Java中的`synchronized`关键字,它是用于保证多线程环境下的同步,解决原子性、可见性和顺序性问题。从JDK1.6开始,synchronized进行了优化,性能得到提升,现在仍可在项目中使用。synchronized有三种用法:修饰实例方法、静态方法和代码块。文章还讨论了synchronized修饰代码块的锁对象、静态与非静态方法调用的互斥性,以及构造方法不能被同步修饰。此外,通过反汇编展示了`synchronized`在方法和代码块上的底层实现,涉及ObjectMonitor和monitorenter/monitorexit指令。
15 0
|
8月前
|
监控 Java API
|
8月前
|
Java 容器
Java——使用多线程模拟真实高并发业务并保证安全性(二)
Java——使用多线程模拟真实高并发业务并保证安全性(二)
|
8月前
|
Java 容器
Java——使用多线程模拟真实高并发业务并保证安全性(一)
Java——使用多线程模拟真实高并发业务并保证安全性(一)
|
8月前
|
Java 程序员 开发者
疫情过后,Java开发者如何应对多线程与高并发面试题目?
发某粉丝年前参加某个NB企业的面试题列表: 聊聊synchronized的CPU原语级别实现 有一千万个数,写一个程序进行高效求和 已知2开平方为1.414,如何不用数学库,求开平方的值,精确到小数点儿后面10位 编码实现两个线程,线程A不断打印1-10的数字,要求在打印到第五个数字的时候通知线程B 自定义线程池需要指定哪7个参数,为什么不建议使用JUC内置线程池? 高并发、任务执行时间短的业务怎样使用线程池? 并发不高、任务执行时间长的业务怎样使用线程池? 并发高、业务执行时间长的业务怎样使用线程池? 设计一个12306网站,能够撑住最高百万级别TPS(淘宝最高54万TPS),你该如何实现

热门文章

最新文章