【Java技术指南】「并发编程专题」CompletionService框架基本使用和原理探究(基础篇)

简介: 【Java技术指南】「并发编程专题」CompletionService框架基本使用和原理探究(基础篇)

前提概要


在开发过程中在使用多线程进行并行处理一些事情的时候,大部分场景在处理多线程并行执行任务的时候,可以通过List添加Future来获取执行结果,有时候我们是不需要获取任务的执行结果的,方便后面引出ExecutorCompletionService。




CompletionService的介绍


  • CompletionService 接口是一个独立的接口,并没有扩展ExecutorService 。 其默认实现类是ExecutorCompletionService。
  • 接口CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。从而可以将任务的执行与处理任务的执行结果分离开来。



CompletionService的实现原理


  • or线程池执行的任务,用BlockingQueue将完成的任务的结果存储下来。
  • 要不断遍历与每个任务关联的Future,然后不断去轮询,判断任务是否已经完成,功能比较繁琐。
public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
复制代码



方法摘要


提交一个 Callable 任务;一旦完成,便可以由take()、poll()方法获取

Future submit(Callable task):
复制代码


提交一个 Runnable 任务,并指定计算结果;

Future submit(Runnable task, V result):
复制代码


获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。

Future take() throws InterruptedException
复制代码


获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null。

Future poll()
复制代码


获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。

Future poll(long timeout, TimeUnit unit) throws InterruptedException
复制代码

例子,程序提交了多个任务,但只要有一个任务完成并返回一个非空的结果,并可以忽略掉其余的任务。

void eample(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
     CompletionService<Result> completionService = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         //提交多个任务
         for (Callable<Result> s : solvers)
             futures.add(completionService.submit(s));
        //
         for (int i = 0; i < n; ++i) {
             try {
                 //等待获取一个已经完成的任务
                 Result r = completionService.take().get();
                 //判断返回结果是否为空
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         //取消所有任务
         for (Future<Result> f : futures)
               f.cancel(true);
         }
     if (result != null)
         use(result);
 }
复制代码

ExecutorCompletionService的介绍


  • ExecutorCompletionService内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。


  • ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范,ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果,该类实现了接口CompletionService




构造方法


  • 指定一个Executor来执行任务,存储完成的任务的完成队列是LinkedBlockingQueue ;


  • Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。
ExecutorCompletionService(Executor executor):
复制代码


指定了任务执行器Executor和已完成的任务队列completionQueue

ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue)
复制代码


实现构造器
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
复制代码


  • 该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。
  • 包装后提交任务的submit()方法,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。
public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
}
复制代码



QueueingFuture为内部类:


在提交任务时,将任务封装成QueueingFuture:

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
}
复制代码


其中,done()方法就是在任务执行完毕后,将任务放入队列中。


  • 在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。


  • 它继承自 FutureTask,并且重写了 done 方法,其方法把任务放到我们包装线程池创建的堵塞队列里面;就是当任务执行完成后,就会被放到队列里面去了。


  • 调用其take() 方法,就是阻塞等待,等到的一定是能够获取的结果的future,然后再调用get()方法获取执行结果;


最后,如果工作中并行处理任务不需要获取结果的,我们正常使用线程池提交就可以,任务技术只要适合工作的业务场景就是好的。



相关文章
|
15小时前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
16 4
Java并发编程学习11-任务执行演示
|
1天前
|
缓存 Java 数据库
Java并发编程中的锁优化策略
【5月更文挑战第9天】 在高负载的多线程应用中,Java并发编程的高效性至关重要。本文将探讨几种常见的锁优化技术,旨在提高Java应用程序在并发环境下的性能。我们将从基本的synchronized关键字开始,逐步深入到更高效的Lock接口实现,以及Java 6引入的java.util.concurrent包中的高级工具类。文中还会介绍读写锁(ReadWriteLock)的概念和实现原理,并通过对比分析各自的优势和适用场景,为开发者提供实用的锁优化策略。
2 0
|
1天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
13 0
|
2天前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
10 0
|
2天前
|
前端开发 安全 Java
使用Spring框架加速Java开发
使用Spring框架加速Java开发
5 0
|
2天前
|
存储 安全 Java
深入理解Java集合框架
深入理解Java集合框架
8 0
|
2天前
|
Java 编译器 开发者
Java并发编程中的锁优化策略
【5月更文挑战第8天】在Java并发编程中,锁是实现线程同步的关键机制。为了提高程序的性能,我们需要对锁进行优化。本文将介绍Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级和读写锁等方法,以帮助开发者提高多线程应用的性能。
|
3天前
|
存储 监控 安全
JVM工作原理与实战(十六):运行时数据区-Java虚拟机栈
JVM作为Java程序的运行环境,其负责解释和执行字节码,管理内存,确保安全,支持多线程和提供性能监控工具,以及确保程序的跨平台运行。本文主要介绍了运行时数据区、Java虚拟机栈等内容。
|
3天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第7天】在Java中,多线程编程是提高应用程序性能和响应能力的关键。本文将深入探讨Java并发编程的核心概念,包括线程安全、同步机制以及性能优化策略。我们将通过实例分析,了解如何避免常见的并发问题,如死锁、竞态条件和资源争用,并学习如何使用Java提供的并发工具来构建高效、可靠的多线程应用。
|
16天前
|
SQL Java 数据库连接
Java从入门到精通:3.1.2深入学习Java EE技术——Hibernate与MyBatis等ORM框架的掌握
Java从入门到精通:3.1.2深入学习Java EE技术——Hibernate与MyBatis等ORM框架的掌握