从源码理解线程池

简介: Executor接口源码非常简单,只有一个execute(Runnable command)回调接口 public interface Executor { void execute(Runnable command);}执行已提交的 Runnable 任务对象。

Executor接口

源码非常简单,只有一个execute(Runnable command)回调接口 

public interface Executor {
    void execute(Runnable command);
}

执行已提交的 Runnable 任务对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
 ...

不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这展示了一个复合执行程序。

class SerialExecutor implements Executor {
     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
     final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) {
         this.executor = executor;
     }

     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }
 }

ExecutorService接口

该接口提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。先看下源码

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Executors类为创建ExecutorService提供了便捷的工厂方法。它只有一个直接实现类ThreadPoolExecutor和间接实现类ScheduledThreadPoolExecutor。

ExecutorService在Executor的基础上增加了一些方法,其中有两个核心的方法:

1、Future<?> submit(Runnable task)

2、<T> Future<T> submit(Callable<T> task)

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法submit扩展了基本方法

Executor.execute(java.lang.Runnable)。

下面对ExecutorService的函数进行一下简单介绍:

  • void shutdown():启动一个关闭命令,不再接受新任务,当所有已提交任务执行完后,就关闭。
  • List<Runnable> shutdownNow():试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。它无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。例如,通过 Thread.interrupt() 来取消典型的实现,所以任何任务无法响应中断都可能永远无法终止。应该关闭未使用的 ExecutorService以允许回收其资源。
  • boolean isShutdown():如果此执行程序已关闭,则返回 true。
  • boolean isTerminated():如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
  • boolean awaitTermination(long timeout,TimeUnit unit) :如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 
  • <T> Future<T> submit(Callable<T> task):提交一个有返回值的任务用于执行,返回一个表示任务结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
  • <T> Future<T> submit(Runnable task,T result):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
  • Future<?> submit(Runnable task):提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回 null。
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。注意,该方法会一直阻塞直到所有任务完成。可以正常地或通过抛出异常来终止已完成任务。如果正在进行此操作时修改了给定的collection,则此方法的结果是不确定的。

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个线程池之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。

每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。

对于核心的几个线程池,无论是 newFixedThreadPool()方法、 newSingleThreadExecutor()还是 newCachedThreadPool()方法, 虽然看起来创建的线程有着完全不同的功能特点, 但其内部实现均使用了 ThreadPoolExecutor实现。 下面给出了这三个线程池的实现方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

由以上线程池的实现代码可以看到, 它们都只是 ThreadPoolExecutor 类的封装 。 为何ThreadPoolExecutor有如此强大的功能呢? 来看一下 ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //后两个参数为可选参数

参数说明:

  • corePoolSize:指定了线程池中的核心线程数
  • maximumPoolSize:最大可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池
  • keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止
  • unit:keepAliveTime参数的时间单位
  • workQueue:保存任务的阻塞队列,被提交但尚未执行的任务,与线程池的大小有关:当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列;  当运行的线程数等于或多于corePoolSize,在有新任务添加时则先加入队列,不直接创建线程;  当队列满时,在有新任务时就创建新线程
  • threadFactory:线程工厂,用于创建新线程,默认使用defaultThreadFactory创建线程
  • handle:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

ThreadPoolExecutor将根据corePoolSize和 maximumPoolSize设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的; 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程;如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池;如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。 

 

参考:

https://www.cnblogs.com/MOBIN/p/5436482.html

http://cmsblogs.com/?p=2444

https://blog.csdn.net/linghu_java/article/details/17123057

 

相关文章
|
22天前
|
存储 Java 数据库
从源码全面解析Java 线程池的来龙去脉
从源码全面解析Java 线程池的来龙去脉
|
22天前
|
Java 调度
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
Java面试必考题之线程的生命周期,结合源码,透彻讲解!
50 1
|
22天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
260 5
|
22天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue
`LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。 `xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹
256 5
|
22天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
236 2
|
22天前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
184 1
|
22天前
|
Java 数据处理
fastdfs源码阅读:文件传输原理与网络IO模型(accept线程、work线程(网络io处理)、dio线程(文件io处理))
fastdfs源码阅读:文件传输原理与网络IO模型(accept线程、work线程(网络io处理)、dio线程(文件io处理))
58 0
|
22天前
|
负载均衡 NoSQL Java
redis7.0源码阅读(四):Redis中的IO多线程(线程池)
redis7.0源码阅读(四):Redis中的IO多线程(线程池)
95 0
|
22天前
|
网络协议 NoSQL Linux
知识巩固源码落实之5:http get异步请求数据demo(多线程+struct epoll_event的ptr)
知识巩固源码落实之5:http get异步请求数据demo(多线程+struct epoll_event的ptr)
27 0
|
22天前
|
XML Java 调度
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
36 0