Java多线程 CompletionService和ExecutorCompletionService

简介: Java多线程 CompletionService和ExecutorCompletionService

一、说明


Future的不足


  • 当通过 .get() 方法获取线程的返回值时,会导致阻塞


  • 也就是和当前这个Future关联的计算任务真正执行完成的时候才返回结果


  • 新任务必须等待已完成任务的结果才能继续进行处理,会浪费很多时间,最好是谁最先执行完成谁最先返回


CompletionService的引入


  • 解决阻塞的问题


  • 以异步的方式一边处理新的线程任务,一边处理已完成任务的结果,将执行任务与处理任务分开进行处理


二、理解


CompletionService


  • java.util.concurrent包下CompletionService<V>接口,但并不继承Executor接口,仅有一个实现类ExecutorCompletionService用于管理线程对象


  • 更加有效地处理Future的返回值,避免阻塞,使用.submit()方法执行任务,使用.take()取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果


public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}


  • submit()方法用来执行线程任务


  • take()方法从队列中获取完成任务的Future对象,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果


  • poll()方法获取并删除代表下一个已完成任务的 Future,如果不存在,则返回null,此无阻塞的效果


  • poll(long timeout, TimeUnit unti) timeout表示等待的最长时间,unit表示时间单位,在指定时间内还没获取到结果,则返回null


ExecutorCompletionService


  • java.util.concurrent包下ExecutorCompletionService<V>类实现CompletionService<V>接口,方法与接口相同


  • ExecutorService可以更精确和简便地完成异步任务的执行


  • executor执行任务,completionQueue保存异步任务执行的结果


public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    ……
    Future<V> submit(Callable<V> task) 
    Future<V> submit(Runnable task, V result) 
    Future<V> take() throws InterruptedException
    Future<V> poll() 
    Future<V> poll(long timeout, TimeUnit unit)
    ……
}


  • completionQueue初始化了一个LinkedBlockingQueue类型的先进先出阻塞队列


    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }


  • submit()方法中QueueingFutureExecutorCompletionService中的内部类


    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }


  • QueueingFutureRunnableFuture实例对象赋值给了task,内部的done()方法将task添加到已完成阻塞队列中,调用take()poll()方法获取已完成的Future


    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }


三、实现


1.使用Future


创建CompletionServiceDemo类,创建好的线程对象,使用Executors工厂类来创建ExecutorService的实例(即线程池),通过ThreadPoolExecutor.submit()方法提交到线程池去执行,线程执行后,返回值Future可被拿到


public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        // 3.使用Future提交三个任务到线程池
        Future future_1 = executorService.submit(callable_1);
        Future future_2 = executorService.submit(callable_2);
        Future future_3 = executorService.submit(callable_3);
        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        System.out.println(future_1.get() + "" + getStringDate());
        System.out.println(future_2.get() + "" + getStringDate());
        System.out.println(future_3.get() + "" + getStringDate());
        System.out.println("结束 " + getStringDate());
        // 5.关闭线程池
        executorService.shutdown();
    }
    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}


future_1.get()会等待执行时间阻塞5秒再获取到结果,而在这5秒内future_2future_3的任务已完成,所以会立马得到结果



2.使用ExecutorCompletionService


创建一个ExecutorCompletionService放入线程池实现CompletionService接口,将创建好的线程对象通过CompletionService提交任务和获取结果


public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);
        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println("结束 " + getStringDate());
        // 5.关闭线程池
        executorService.shutdown();
    }
    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}


提交顺序是1-2-3,按照完成这些任务的时间顺序处理它们的结果,返回顺序是3-2-1



3.take()方法


take()方法从队列中获取完成任务的Future对象,会阻塞,一直等待线程池中返回一个结果,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果


如果调用take()方法的次数大于任务数,会因为等不到有任务返回结果而阻塞,只有三个任务,第四次take等不到结果而阻塞



4.poll()方法


poll()方法不会去等结果造成阻塞,没有结果则返回null,接着程序继续往下运行

直接用completionService.poll().get()会引发 NullPointerException



创建一个循环,连续调用poll()方法,每次隔1秒调用,没有结果则返回null



public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);
        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        // 5.创建一个循环,连续调用poll()方法,间隔1秒
        for (int i = 0; i < 8; i++) {
            Future future = completionService.poll();
            if (future!=null){
                System.out.println(future.get() + getStringDate());
            }else {
                System.out.println(future+" "+getStringDate());
            }
            Thread.sleep(1000);
        }  
        System.out.println("结束 " + getStringDate());
        // 6.关闭线程池
        executorService.shutdown();
    }
    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}


5.poll(long timeout, TimeUnit unit)方法


poll(long timeout, TimeUnit unit)方法设置了等待时间,等待超时还没有结果就返回null


不使用 Thread.sleep(1000),将等待时间设置成0.5秒,由于只有8次循环,也就是4秒执行时间,而callable_1需要执行5秒,获取不到结果则返回null



public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };
        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);
        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        // 5.创建一个循环,连续调用poll()方法,间隔1秒
        for (int i = 0; i < 8; i++) {
            Future future = completionService.poll(500, TimeUnit.MILLISECONDS);
            if (future!=null){
                System.out.println(future.get() + getStringDate());
            }else {
                System.out.println(future+" "+getStringDate());
            }
        }
        System.out.println("结束 " + getStringDate());
        // 6.关闭线程池
        executorService.shutdown();
    }
    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}
目录
相关文章
|
6天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
4天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
5天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
4天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
10天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
33 9
|
7天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
13天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
9天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
13天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
27 3
|
11天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。