【Java深层系列】「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南

简介: 【Java深层系列】「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南

image.png



CompletionService基本介绍


  • CompletionService与ExecutorService类似都可以用来执行线程池的任务,ExecutorService继承了Executor接口,而CompletionService则是一个接口。


  • 主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务(task)的异步性,只要为每个task创建一个线程就实现了任务的异步性。


在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。另外,Executor框架还支持同步任务的执行,就是在execute方法中调用提交任务的run()方法就属于同步调用,当我们采用异步的时候,需要进行的就是获取Future对象,之后在需要使用的时候get出来结果即可。




异步调用判断机制


一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。为了更精确实现任务的异步执行以及更简便的完成任务的异步执行,可以使用CompletionService




CompletionService实现原理


CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。


CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。




QueueingFuture的源码如下


  • ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。


  • 当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。


private class QueueingFuture extends FutureTask<Void> {
       QueueingFuture(RunnableFuture<V> task) {
           super(task, null);
           this.task = task;
       }
       protected void done() { completionQueue.add(task); }
       private final Future<V> task;
   }
复制代码

CompletionService将提交的任务转化为QueueingFuture,并且覆盖了done方法,在done方法中就是将任务加入任务队列中。



使用ExecutorService实现任务


比如:电商中加载商品详情这一操作,因为商品属性的多样性,将商品的图片显示与商品简介的显示设为两个独立执行的任务。


另外,由于商品的图片可能有许多张,所以图片的显示往往比简介显示更慢。这个时候异步执行能够在一定程度上加快执行的速度提高系统的性能。

public class DisplayProductInfoWithExecutorService {
    //线程池
    private final ExecutorService executorService = Executors.newFixedThreadPool(2);
    //日期格式器
    private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
    // 由于可能商品的图片可能会有很多张,所以显示商品的图片往往会有一定的延迟
    // 除了商品的详情外还包括商品简介等信息的展示,由于这里信息主要的是文字为
    // 主,所以能够比图片更快显示出来。下面的代码就以执行这两个任务为主线,完
    // 成这两个任务的执行。由于这两个任务的执行存在较大差距,所以想到的第一个
    // 思路就是异步执行,首先执行图像的下载任务,之后(不会很久)开始执行商品
    // 简介信息的展示,如果网络足够好,图片又不是很大的情况下,可能在开始展示
    // 商品的时候图像就下载完成了,所以自然想到使用Executor和Callable完成异
    // 步任务的执行。
    public void renderProductDetail() {
        final List<ProductInfo>  productInfos = loadProductImages();
        //异步下载图像的任务
        Callable<List<ProductImage>> task = new Callable<List<ProductImage>>() {
            @Override
            public List<ProductImage> call() throws Exception {
                List<ProductImage> imageList = new ArrayList<>();
                for (ProductInfo info : productInfos){
                    imageList.add(info.getImage());
                }
                return imageList;
            }
        };
        //提交给线程池执行
        Future<List<ProductImage>> listFuture = executorService.submit(task);
        //展示商品简介的信息
        renderProductText(productInfos);
        try {
            //显示商品的图片
            List<ProductImage> imageList = listFuture.get();
            renderProductImage(imageList);
        } catch (InterruptedException e) {
            // 如果显示图片发生中断异常则重新设置线程的中断状态
            // 这样做可以让wait中的线程唤醒
            Thread.currentThread().interrupt();
            // 同时取消任务的执行,参数false表示在线程在执行不中断
            listFuture.cancel(true);
        } catch (ExecutionException e) {
            try {
                throw new Throwable(e.getCause());
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }
    }
    private void renderProductImage(List<ProductImage> imageList ) {
        for (ProductImage image : imageList){
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " display products images! "
            + format.format(new Date()));
    }
    private void renderProductText(List<ProductInfo> productInfos) {
        for (ProductInfo info : productInfos){
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " display products description! "
            + format.format(new Date()));
    }
    private List<ProductInfo> loadProductImages() {
        List<ProductInfo> list = new ArrayList<>();
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ProductInfo info = new ProductInfo();
        info.setImage(new ProductImage());
        list.add(info);
        System.out.println(Thread.currentThread().getName() + " load products info! "
                + format.format(new Date()));
        return list;
    }
    /**
     * 商品
     */
    private static class ProductInfo{
        private ProductImage image;
        public ProductImage getImage() {
            return image;
        }
        public void setImage(ProductImage image) {
            this.image = image;
        }
    }
    private static class ProductImage{}
    public static void main(String[] args){
        DisplayProductInfoWithExecutorService cd = new DisplayProductInfoWithExecutorService();
        cd.renderProductDetail();
        System.exit(0);
    }
}
复制代码


CompletionService实现任务


使用CompletionService的一大改进就是把多个图片的加载分发给多个工作单元进行处理,这样通过分发的方式就缩小了商品图片的加载与简介信息的加载的速度之间的差距,让这些小任务在线程池中执行,这样就大大降低了下载所有图片的时间,所以在这个时候可以认为这两个任务是同构的。使用CompletionService完成最合适不过了。

public class DisplayProductInfoWithCompletionService {
    //线程池
    private final ExecutorService executorService;
    //日期格式器
    private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
    public DisplayProductInfoWithCompletionService(ExecutorService executorService) {
        this.executorService = executorService;
    }
    public void renderProductDetail() {
        final List<ProductInfo> productInfos = loadProductInfos();
        CompletionService<ProductImage> completionService = new ExecutorCompletionService<ProductImage>(executorService);
        //为每个图像的下载建立一个工作任务
        for (final ProductInfo info : productInfos) {
            completionService.submit(new Callable<ProductImage>() {
                @Override
                public ProductImage call() throws Exception {
                    return info.getImage();
                }
            });
        }
        //展示商品简介的信息
        renderProductText(productInfos);
        try {
            //显示商品图片
            for (int i = 0, n = productInfos.size(); i < n; i++){
                Future<ProductImage> imageFuture = completionService.take();
                ProductImage image = imageFuture.get();
                renderProductImage(image);
            }
        } catch (InterruptedException e) {
            // 如果显示图片发生中断异常则重新设置线程的中断状态
            // 这样做可以让wait中的线程唤醒
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            try {
                throw new Throwable(e.getCause());
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }
    }
    private void renderProductImage(ProductImage image) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " display products images! "
                + format.format(new Date()));
    }
    private void renderProductText(List<ProductInfo> productInfos) {
        for (ProductInfo info : productInfos) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " display products description! "
                + format.format(new Date()));
    }
    private List<ProductInfo> loadProductInfos() {
        List<ProductInfo> list = new ArrayList<>();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ProductInfo info = new ProductInfo();
        info.setImage(new ProductImage());
        list.add(info);
        System.out.println(Thread.currentThread().getName() + " load products info! "
                + format.format(new Date()));
        return list;
    }
    /**
     * 商品
     */
    private static class ProductInfo {
        private ProductImage image;
        public ProductImage getImage() {
            return image;
        }
        public void setImage(ProductImage image) {
            this.image = image;
        }
    }
    private static class ProductImage {
    }
    public static void main(String[] args) {
        DisplayProductInfoWithCompletionService cd = new DisplayProductInfoWithCompletionService(Executors.newCachedThreadPool());
        cd.renderProductDetail();
    }
}
复制代码

执行结果与上面的一样。因为多个ExecutorCompletionService可以共享一个Executor,因此可以创建一个特定某个计算的私有的,又能共享公共的Executor的ExecutorCompletionService。



CompletionService解决Future的get方法阻塞问题


解决方法:

CompletionService的take()方法获取最先执行完的线程的Future对象。


测试方法

public static void main(String[] args) throws Exception {
    CallableDemo callable = new CallableDemo(1,100000);
    CallableDemo callable2 = new CallableDemo(1,100);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 5L,TimeUnit.SECONDS, new LinkedBlockingDeque());
    CompletionService csRef = new ExecutorCompletionService(executor);
    System.out.println("main 1 " +System.currentTimeMillis());
    csRef.submit(callable);
    csRef.submit(callable2);
    System.out.println("main 2 " +System.currentTimeMillis());
    System.out.println(csRef.take().get());
    System.out.println("main 3 " +System.currentTimeMillis());
    System.out.println(csRef.take().get());
    System.out.println("main 4 " +System.currentTimeMillis());
}
复制代码


线程类

import java.util.concurrent.Callable;
public class CallableDemo implements Callable<String> {
    private int begin;
    private int end;
    private int sum;
   public CallableDemo(int begin, int end) {
     super();
     this.begin = begin;
     this.end = end;
  }
   public String call() throws Exception {
       for(int i=begin;i<=end;i++){
           for(int j=begin;j<=end;j++){
              sum+=j;
           }
      }
      Thread.sleep(8000);
    return begin+"-" +end+"的和:"+ sum;
   }
}
复制代码



CompletionService小结


相比ExecutorService,CompletionService可以更精确和简便地完成异步任务的执行 CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果 在执行大量相互独立和同构的任务时,可以使用CompletionService CompletionService可以为任务的执行设置时限,主要是通过BlockingQueue的poll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务.




相关文章
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
11天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
7天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
11天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
1天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
1天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
3天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
5天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第13天】 在Java并发编程中,锁是一种常见的同步机制,用于保证多个线程之间的数据一致性。然而,不当的锁使用可能导致性能下降,甚至死锁。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级等方法,以提高程序的执行效率。
12 4
|
6天前
|
Java 调度 开发者
Java 21时代的标志:虚拟线程带来的并发编程新境界
Java 21时代的标志:虚拟线程带来的并发编程新境界
14 0
|
9天前
|
监控 安全 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第10天】 在Java开发中,并发编程是提升应用性能和响应能力的关键手段。然而,线程安全问题和性能调优常常成为开发者面临的挑战。本文将通过分析Java并发模型的核心原理,探讨如何平衡线程安全与系统性能。我们将介绍关键的同步机制,包括synchronized关键字、显式锁(Lock)以及并发集合等,并讨论它们在不同场景下的优势与局限。同时,文章将提供实用的代码示例和性能测试方法,帮助开发者在保证线程安全的前提下,实现高效的并发处理。