从源码理解线程池

简介: Executor接口源码非常简单,只有一个execute(Runnable command)回调接口 public interface Executor { void execute(Runnable command);}执行已提交的 Runnable 任务对象。

Executor接口

源码非常简单,只有一个execute(Runnable command)回调接口 

public interface Executor {
    void execute(Runnable command);
}

执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
 ...

不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这展示了一个复合执行程序。

class SerialExecutor implements Executor {
     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
     final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) {
         this.executor = executor;
     }

     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }
 }

ExecutorService接口

该接口提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。先看下源码

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Executors类为创建ExecutorService提供了便捷的工厂方法。它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。

ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:

1、Future<?> submit(Runnable task)

2、<T> Future<T> submit(Callable<T> task)

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法

Executor.execute(java.lang.Runnable)。

下面对ExecutorService的函数进行一下简单介绍:

  • void shutdown():启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。
  • List<Runnable> shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。它无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。应该关闭未使用的 ExecutorService以允许回收其资源。
  • boolean isShutdown():如果此执行程序已关闭,则返回 true。
  • boolean isTerminated():如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
  • boolean awaitTermination(long timeout,TimeUnit unit) :如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 
  • <T> Future<T> submit(Callable<T> task):提交一个有返回值的任务用于执行,返回一个表示任务结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
  • <T> Future<T> submit(Runnable task,T result):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
  • Future<?> submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。注意,该方法会一直阻塞直到所有任务完成。可以正常地或通过抛出异常来终止已完成任务。如果正在进行此操作时修改了给定的collection,则此方法的结果是不确定的。

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个线程池之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。

每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。

对于核心的几个线程池,无论是 newFixedThreadPool()方法、 newSingleThreadExecutor()还是 newCachedThreadPool()方法, 虽然看起来创建的线程有着完全不同的功能特点, 但其内部实现均使用了 ThreadPoolExecutor实现。 下面给出了这三个线程池的实现方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

由以上线程池的实现代码可以看到, 它们都只是 ThreadPoolExecutor 类的封装 。 为何ThreadPoolExecutor有如此强大的功能呢? 来看一下 ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //后两个参数为可选参数

参数说明:

  • corePoolSize:指定了线程池中的核心线程数
  • maximumPoolSize:最大可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池
  • keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止
  • unit:keepAliveTime参数的时间单位
  • workQueue:保存任务的阻塞队列,被提交但尚未执行的任务,与线程池的大小有关:当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列;  当运行的线程数等于或多于corePoolSize,在有新任务添加时则先加入队列,不直接创建线程;  当队列满时,在有新任务时就创建新线程
  • threadFactory:线程工厂,用于创建新线程,默认使用defaultThreadFactory创建线程
  • handle:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

ThreadPoolExecutor将根据corePoolSize和 maximumPoolSize设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的; 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程;如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池;如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。 

 

参考:

https://www.cnblogs.com/MOBIN/p/5436482.html

http://cmsblogs.com/?p=2444

https://blog.csdn.net/linghu_java/article/details/17123057

 

相关文章
|
7月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解02-阻塞队列之ArrayBlockingQueue
`ArrayBlockingQueue` 是Java中一个基于数组的并发队列,具有线程安全的性质。以下是其关键信息的摘要: - **继承实现关系**:它扩展了`AbstractQueue`并实现了`BlockingQueue`接口,确保线程安全的入队和出队操作。 - **数据结构**:内部由固定大小的数组支撑,有`takeIndex`和`putIndex`跟踪元素的添加和移除位置,`count`记录队列中的元素数量。 - **特点**:队列长度在创建时必须指定且不可变,遵循先进先出(FIFO)原则,当队列满时,添加元素会阻塞,空时,移除元素会阻塞。
72 0
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
144 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
35 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
35 1
|
4月前
|
算法 安全 Java
深入解析Java多线程:源码级别的分析与实践
深入解析Java多线程:源码级别的分析与实践
|
6月前
|
缓存 并行计算 Java
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
|
6月前
|
Java
线程池ThreadPoolExcutor源码剖析---工作原理
线程池ThreadPoolExcutor源码剖析---工作原理
|
6月前
|
Java
|
6月前
|
分布式计算 Java Spark
|
7月前
|
Java 调度
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
90 1