【高并发】深度解析线程池中那些重要的顶层接口和抽象类

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 如果细细品味线程池的底层源码实现,你会发现整个线程池体系的设计是非常优雅的!这些代码的设计值得我们去细细品味和研究,从中学习优雅代码的设计规范,形成自己的设计思想,为我所用!哈哈,说多了,接下来,我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的!

大家好,我是冰河~~

在上一篇《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》一文中,从整体上介绍了Java的线程池。如果细细品味线程池的底层源码实现,你会发现整个线程池体系的设计是非常优雅的!这些代码的设计值得我们去细细品味和研究,从中学习优雅代码的设计规范,形成自己的设计思想,为我所用!哈哈,说多了,接下来,我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的!

通过对线程池中接口和抽象类的分析,你会发现,整个线程池设计的是如此的优雅和强大,从线程池的代码设计中,我们学到的不只是代码而已!!

题外话:膜拜Java大神Doug Lea,Java中的并发包正是这位老爷子写的,他是这个世界上对Java影响力最大的一个人。

一、接口和抽象类总览

说起线程池中提供的重要的接口和抽象类,基本上就是如下图所示的接口和类。

接口与类的简单说明:

  • Executor接口:这个接口也是整个线程池中最顶层的接口,提供了一个无返回值的提交任务的方法。
  • ExecutorService接口:派生自Executor接口,扩展了很过功能,例如关闭线程池,提交任务并返回结果数据、唤醒线程池中的任务等。
  • AbstractExecutorService抽象类:派生自ExecutorService接口,实现了几个非常实现的方法,供子类进行调用。
  • ScheduledExecutorService定时任务接口,派生自ExecutorService接口,拥有ExecutorService接口定义的全部方法,并扩展了定时任务相关的方法。

接下来,我们就分别从源码角度来看下这些接口和抽象类从顶层设计上提供了哪些功能。

二、Executor接口

Executor接口的源码如下所示。

public interface Executor {
    //提交运行任务,参数为Runnable接口对象,无返回值
    void execute(Runnable command);
}

从源码可以看出,Executor接口非常简单,只提供了一个无返回值的提交任务的execute(Runnable)方法。

由于这个接口过于简单,我们无法得知线程池的执行结果数据,如果我们不再使用线程池,也无法通过Executor接口来关闭线程池。此时,我们就需要ExecutorService接口的支持了。

三、ExecutorService接口

ExecutorService接口是非定时任务类线程池的核心接口,通过ExecutorService接口能够向线程池中提交任务(支持有返回结果和无返回结果两种方式)、关闭线程池、唤醒线程池中的任务等。ExecutorService接口的源码如下所示。

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {

    //关闭线程池,线程池中不再接受新提交的任务,但是之前提交的任务继续运行,直到完成
    void shutdown();
    
    //关闭线程池,线程池中不再接受新提交的任务,会尝试停止线程池中正在执行的任务。
    List<Runnable> shutdownNow();
    
    //判断线程池是否已经关闭
    boolean isShutdown();
    
    //判断线程池中的所有任务是否结束,只有在调用shutdown或者shutdownNow方法之后调用此方法才会返回true。
    boolean isTerminated();

    //等待线程池中的所有任务执行结束,并设置超时时间
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    //提交一个Callable接口类型的任务,返回一个Future类型的结果
    <T> Future<T> submit(Callable<T> task);
    
    //提交一个Callable接口类型的任务,并且给定一个泛型类型的接收结果数据参数,返回一个Future类型的结果
    <T> Future<T> submit(Runnable task, T result);

    //提交一个Runnable接口类型的任务,返回一个Future类型的结果
    Future<?> submit(Runnable task);

    //批量提交任务并获得他们的future,Task列表与Future列表一一对应
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    //批量提交任务并获得他们的future,并限定处理所有任务的时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
    
    //批量提交任务并获得一个已经成功执行的任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException; 
    
    //批量提交任务并获得一个已经成功执行的任务的结果,并限定处理任务的时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

关于ExecutorService接口中每个方法的含义,直接上述接口源码中的注释即可,这些接口方法都比较简单,我就不一一重复列举描述了。这个接口也是我们在使用非定时任务类的线程池中最常使用的接口。

四、AbstractExecutorService抽象类

AbstractExecutorService类是一个抽象类,派生自ExecutorService接口,在其基础上实现了几个比较实用的方法,提供给子类进行调用。我们还是来看下AbstractExecutorService类的源码。

注意:大家可以到java.util.concurrent包下查看完整的AbstractExecutorService类的源码,这里,我将AbstractExecutorService源码进行拆解,详解每个方法的作用。

  • newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

RunnableFuture类用于获取执行结果,在实际使用时,我们经常使用的是它的子类FutureTask,newTaskFor方法的作用就是将任务封装成FutureTask对象,后续将FutureTask对象提交到线程池。

  • doInvokeAny方法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    //提交的任务为空,抛出空指针异常
    if (tasks == null)
        throw new NullPointerException();
    //记录待执行的任务的剩余数量
    int ntasks = tasks.size();
    //任务集合中的数据为空,抛出非法参数异常
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //以当前实例对象作为参数构建ExecutorCompletionService对象
    // ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        // 记录可能抛出的执行异常
        ExecutionException ee = null;
        // 初始化超时时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();
    
        //提交任务,并将返回的结果数据添加到futures集合中
        //提交一个任务主要是确保在进入循环之前开始一个任务
        futures.add(ecs.submit(it.next()));
        --ntasks;
        //记录正在执行的任务数量
        int active = 1;

        for (;;) {
            //从完成任务的BlockingQueue队列中获取并移除下一个将要完成的任务的结果。
            //如果BlockingQueue队列中中的数据为空,则返回null
            //这里的poll()方法是非阻塞方法
            Future<T> f = ecs.poll();
            //获取的结果为空
            if (f == null) {
                //集合中仍有未执行的任务数量
                if (ntasks > 0) {
                    //未执行的任务数量减1
                    --ntasks;
                    //提交完成并将结果添加到futures集合中
                    futures.add(ecs.submit(it.next()));
                    //正在执行的任务数量加•1
                    ++active;
                }
                //所有任务执行完成,并且返回了结果数据,则退出循环
                //之所以处理active为0的情况,是因为poll()方法是非阻塞方法,可能导致未返回结果时active为0
                else if (active == 0)
                    break;
                //如果timed为true,则执行获取结果数据时设置超时时间,也就是超时获取结果表示
                else if (timed) {    
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                //没有设置超时,并且所有任务都被提交了,则一直阻塞,直到返回一个执行结果
                else
                    f = ecs.take();
            }
            //获取到执行结果,则将正在执行的任务减1,从Future中获取结果并返回
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        //如果从所有执行的任务中获取到一个结果数据,则取消所有执行的任务,不再向下执行
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

这个方法是批量执行线程池的任务,最终返回一个结果数据的核心方法,通过源代码的分析,我们可以发现,这个方法只要获取到一个结果数据,就会取消线程池中所有运行的任务,并将结果数据返回。这就好比是很多要进入一个居民小区一样,只要有一个人有门禁卡,门卫就不再检查其他人是否有门禁卡,直接放行。

在上述代码中,我们看到提交任务使用的ExecutorCompletionService对象的submit方法,我们再来看下ExecutorCompletionService类中的submit方法,如下所示。

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}

可以看到,ExecutorCompletionService类中的submit方法本质上调用的还是Executor接口的execute方法。

  • invokeAny方法
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

这两个invokeAny方法本质上都是在调用doInvokeAny方法,在线程池中提交多个任务,只要返回一个结果数据即可。

直接看上面的代码,大家可能有点晕。这里,我举一个例子,我们在使用线程池的时候,可能会启动多个线程去执行各自的任务,比如线程A负责task_a,线程B负责task_b,这样可以大规模提升系统处理任务的速度。如果我们希望其中一个线程执行完成返回结果数据时立即返回,而不需要再让其他线程继续执行任务。此时,就可以使用invokeAny方法。

  • invokeAll方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    //标识所有任务是否完成
    boolean done = false;
    try {
        //遍历所有任务
        for (Callable<T> t : tasks) {
            将每个任务封装成RunnableFuture对象提交任务
            RunnableFuture<T> f = newTaskFor(t);
            //将结果数据添加到futures集合中
            futures.add(f);
            //执行任务
            execute(f);
        }
        //遍历结果数据集合
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            //任务没有完成
            if (!f.isDone()) {
                try {
                    //阻塞等待任务完成并返回结果
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        //任务完成(不管是正常结束还是异常完成)
        done = true;
        //返回结果数据集合
        return futures;
    } finally {
        //如果发生中断异常InterruptedException 则取消已经提交的任务
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            // 在添加执行任务时超时判断,如果超时则立刻返回futures集合
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }
         // 遍历所有任务
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                //对结果进行判断时进行超时判断
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                //重置任务的超时时间
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

invokeAll方法同样实现了无超时时间设置和有超时时间设置的逻辑。

无超时时间设置的invokeAll方法总体逻辑为:将所有任务封装成RunnableFuture对象,调用execute方法执行任务,将返回的结果数据添加到futures集合,之后对futures集合进行遍历判断,检测任务是否完成,如果没有完成,则调用get方法阻塞任务,直到返回结果数据,此时会忽略异常。最终在finally代码块中对所有任务是否完成的标识进行判断,如果存在未完成的任务,则取消已经提交的任务。

有超时设置的invokeAll方法总体逻辑与无超时时间设置的invokeAll方法总体逻辑基本相同,只是在两个地方添加了超时的逻辑判断。一个是在添加执行任务时进行超时判断,如果超时,则立刻返回futures集合;另一个是每次对结果数据进行判断时添加了超时处理逻辑。

invokeAll方法中本质上还是调用Executor接口的execute方法来提交任务。

  • submit方法

submit方法的逻辑比较简单,就是将任务封装成RunnableFuture对象并提交,执行任务后返回Future结果数据。如下所示。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

从源码中可以看出submit方法提交任务时,本质上还是调用的Executor接口的execute方法。

综上所述,在非定时任务类的线程池中提交任务时,本质上都是调用的Executor接口的execute方法。至于调用的是哪个具体实现类的execute方法,我们在后面的文章中深入分析。

五、ScheduledExecutorService接口

ScheduledExecutorService接口派生自ExecutorService接口,继承了ExecutorService接口的所有功能,并提供了定时处理任务的能力,ScheduledExecutorService接口的源代码比较简单,如下所示。

package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    //延时delay时间来执行command任务,只执行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //延时delay时间来执行callable任务,只执行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    //延时initialDelay时间首次执行command任务,之后每隔period时间执行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
                                                  
    //延时initialDelay时间首次执行command任务,之后每延时delay时间执行一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

至此,我们分析了线程池体系中重要的顶层接口和抽象类。

通过对这些顶层接口和抽象类的分析,我们需要从中感悟并体会软件开发中的抽象思维,深入理解抽象思维在具体编码中的实现,最终,形成自己的编程思维,运用到实际的项目中,这也是我们能够从源码中所能学到的众多细节之一。这也是高级或资深工程师和架构师必须了解源码细节的原因之一。

好了,今天就到这儿吧,我是冰河,我们下期见~~

目录
相关文章
|
3天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
27 6
|
16天前
|
缓存 Java 调度
多线程编程核心:上下文切换深度解析
在现代计算机系统中,多线程编程已成为提高程序性能和响应速度的关键技术。然而,多线程编程中一个不可避免的概念就是上下文切换(Context Switching)。本文将深入探讨上下文切换的概念、原因、影响以及优化策略,帮助你在工作和学习中深入理解这一技术干货。
36 10
|
17天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
16天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
34 4
|
16天前
|
算法 调度 开发者
多线程编程核心:上下文切换深度解析
在多线程编程中,上下文切换是一个至关重要的概念,它直接影响到程序的性能和响应速度。本文将深入探讨上下文切换的含义、原因、影响以及如何优化,帮助你在工作和学习中更好地理解和应用多线程技术。
25 4
|
16天前
|
Java 调度 Android开发
安卓与iOS开发中的线程管理差异解析
在移动应用开发的广阔天地中,安卓和iOS两大平台各自拥有独特的魅力。如同东西方文化的差异,它们在处理多线程任务时也展现出不同的哲学。本文将带你穿梭于这两个平台之间,比较它们在线程管理上的核心理念、实现方式及性能考量,助你成为跨平台的编程高手。
|
20天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
25天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
Java
java线程接口
Thread的构造方法创建对象的时候传入了Runnable接口的对象 ,Runnable接口对象重写run方法相当于指定线程任务,创建线程的时候绑定了该线程对象要干的任务。 Runnable的对象称之为:线程任务对象 不是线程对象 必须要交给Thread线程对象。 通过Thread的构造方法, 就可以把任务对象Runnable,绑定到Thread对象中, 将来执行start方法,就会自动执行Runable实现类对象中的run里面的内容。
39 1
|
27天前
|
Java
为什么一般采用实现Runnable接口创建线程?
因为使用实现Runnable接口的同时我们也能够继承其他类,并且可以拥有多个实现类,那么我们在拥有了Runable方法的同时也可以使用父类的方法;而在Java中,一个类只能继承一个父类,那么在继承了Thread类后我们就不能再继承其他类了。
25 0

推荐镜像

更多