CompletionService 使用小结

简介: CompletionService 使用小结

本文为博主原创,转载请注明出处:

  实现异步任务时,经常使用 FutureTask 来实现;一个简单的示例代码如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //构建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        FutureTask<Integer> futureTask1 = (FutureTask<Integer>) executorService.submit(()->doTask1());
        FutureTask<Integer> futureTask2 = (FutureTask<Integer>) executorService.submit(()->doTask2());
        //    获取电结果并异步保存
        executorService.execute(()->save(futureTask1.get()));
        //    获取结果并异步保存
        executorService.execute(()->save(futureTask2.get());
         
    }

  上面代码如果 futureTask1 的任务需要执行很长时间,而 futureTask2 执行很短时间,上面代码在执行的过程中,futureTask2 任务的执行也的先等 futureTask1.get() 执行结束后 ,才能保存 futureTask2.get() ;因为这个主线程都阻塞在 futureTask1.get() 的操作上;严重降低了效率。

  此时可以使用 CompletionService 来解决这个问题

  CompletionService 接口的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样就可以将执行任务与处理任务分离开。

  CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果;先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。在执行大量相互独立和同构的任务时,可以使用CompletionService;

  该实现类定义的三个属性:

  在类的注释上有使用的示例,可以参考学习

  使用 CompletionService 实现的示例如下:

public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 创建CompletionService
        CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
        // 用于保存Future对象
        List<Future<Integer>> futures = new ArrayList<>(3);
        //提交异步任务,并保存future到futures
        futures.add(cs.submit(()->doTask1()));
        futures.add(cs.submit(()->doTask2()));
        futures.add(cs.submit(()->doTask3()));
        // 获取最快返回的任务执行结果
        Integer r = 0;
        try {
            // 只要有一个成功返回,则break
            for (int i = 0; i < 3; ++i) {
                r = cs.take().get();
                //简单地通过判空来检查是否成功返回
                if (r != null) {
                    break;
                }
            }
        } finally {
            //取消所有任务
            for(Future<Integer> f : futures)
                f.cancel(true);
        }
        // 返回结果
    }

 

 


标签: java , 高并发

目录
相关文章
|
5月前
|
存储 缓存 安全
(八)深入并发之Runnable、Callable、FutureTask及CompletableFuture原理分析
关于Runnable、Callable接口大家可能在最开始学习Java多线程编程时,都曾学习过一个概念:在Java中创建多线程的方式有三种:继承Thread类、实现Runnable接口以及实现Callable接口。但是实则不然,真正创建多线程的方式只有一种:继承Thread类,因为只有`new Thread().start()`这种方式才能真正的映射一条OS的内核线程执行,而关于实现Runnable接口以及实现Callable接口创建出的Runnable、Callable对象在我看来只能姑且被称为“多线程任务”,因为无论是Runnable对象还是Callable对象,最终执行都要交由Threa
|
7月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
279 0
|
安全 Java
FutureTask详解
本章讲解了FutureTask的用法和使用场景
103 0
JavaThread、Runnable、Callable、线程池的使用
JavaThread、Runnable、Callable、线程池的使用
JavaThread、Runnable、Callable、线程池的使用
|
Java
深入理解FutureTask
我们在日常的多线程编程中,为了充分的利用现在计算机多核的CPU资源,通常是需要开启多个线程来执行相对应的异步任务。在Java中,如果想新建一个线程,就必须要实现Runnable接口或者继承Thread。但是无论这两种方式如何实现,我们都无法获取任务执行的返回结果,那么有没有一种方式是可以获取异步线程返回的结果呢?
812 2
深入理解FutureTask
|
分布式计算 Java 大数据
ForkJoinPool线程池
ForkJoinPool线程池
|
消息中间件 设计模式 Kafka
CompletionService学习
前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS
100 0
CompletionService学习
|
Java
Java多线程 CompletionService和ExecutorCompletionService
Java多线程 CompletionService和ExecutorCompletionService
144 0
Java多线程 CompletionService和ExecutorCompletionService
|
安全
线程池中CompletionService的应用
线程池中CompletionService的应用
114 0
|
存储
多线程 - Callable、Future 和 FutureTask 简单应用(二)
多线程 - Callable、Future 和 FutureTask 简单应用(二)
120 0
多线程 - Callable、Future 和 FutureTask 简单应用(二)