Executor体系
java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。
使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。
jdk中内置了Executor框架,可以用于实现线程池,大体结构如下:
Executor
Executor接口只定义了一个execute方法,可以用于执行一个Runnable类型对象:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*/
void execute(Runnable command);
}
ExecutorService
ExecutorService扩展了Executor接口,提供了更丰富的方法:
public interface ExecutorService extends Executor {
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*/
void shutdown();
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*/
Future<?> submit(Runnable task);
}
submit和execute的区别
执行一个任务,可以使用submit和execute,这两者有什么区别呢?
1. execute只能接受Runnable类型的任务;
2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
submit和execute
submit和execute两者的区别:
1. execute只能接受Runnable类型的任务;
2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。
ThreadPoolExecutor
ThreadPoolExecutor表示一个线程池,ThreadPoolExecutor实现了Executor接口,任何Runnable类型的线程都可以被ThreadPoolExecutor线程池调度。
jdk还提供了Executors类用于便捷的创建线程池,Executors相当于线程池工厂,通过Executors可以获得拥有特定功能的线程池,其主要API如下:
public class Executors {
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue.
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue.
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
API简析:
newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池空闲则立即执行;若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
newSingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
newCachedThreadPool:返回一个corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE的线程池,因为corePoolSize为0,所以所有线程在空闲60s后就会被回收。
线程池的线程数量不确定,但若有空闲线程则直接复用;如果所有线程都在工作,并且此时又有新的任务提交,则会创建新的线程处理任务,并且每一个空闲线程会在超时后后自动回收。
newSingleThreadScheduledThreadPool:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService智商扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
newScheduledThreadPool:该方法也会返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。
线程池内部实现
Executors的如下方法,实质上都是对ThreadPoolExecutor的封装:
newFixedThreadPool(int nThreads)
newSingleThreadExecutor()
newCachedThreadPool()
ThreadPoolExecutor的构造方法如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
参数含义:
corePoolSize:指定了线程池中的线程数量;
maximumPoolSize:指定了线程池中的最大线程数量;
keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的线程能够存活的时间,超过时间则被销毁;
unit:keepAliveTime的单位;
workQueue:任务队列,用于保存被提交但尚未被执行的任务;
threadFactory:线程工厂,用于创建线程,一般用默认的即可;
handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。
任务队列
workQueue接收的类型为BlockingQueue的阻塞队列,且只能存放Runnable类型对象。ThreadPoolExecutor中可能用到的阻塞队列如下。
SynchronousQueue
SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作。提交的任务不会真实地保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则创建新的线程,如果线程数达到最大值,则执行拒绝策略。
Executors.newCachedThreadPool()
返回的线程池就是使用的这种队列。
ArrayBlockingQueue
这是一个有界的阻塞队列,数组实现,其构造函数如下:
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
当线程池的线程数小于corePoolSize,则优先创建新的线程,否则将任务加入等待队列。若等待队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。
因此,有界队列仅在队列装满时,才可能将线程数量提高到corePoolSize以上,除非系统非常繁忙,否则有界队列能够确保核心线程数维持在corePoolSize。
LinkedBlockingQueue
这个也是有边界的队列,但是是链表实现的,如果初始化的时候不指定边界值则默认是Interger.MAX_VALUE
:
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
PriorityBlockQueue
带有执行优先级的阻塞队列,可以控制任务执行的先后顺序,没有边界。其根据任务自身的优先级优先级顺序先后执行。
execute
从execute的实际执行过程中,可以观察到corePoolSize和maximumPoolSize的具体应用:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
workerCountOf(c)获取了当前线程的线程总数,当线程总数小于corePoolSize时,会将任务通过addWorker()直接调度执行,否则,通过workQueue.offer()将任务加入任务队列中等待。
如果加入队列失败(例如有界队列达到上限或使用了SynchronousQueue),则将任务直接提交给线程池,如果当前线程已经达到maximumPoolSize,则提交失败,执行拒绝策略。
其执行流程示意图如下:
拒绝策略
当线程池的线程数达到maximumPoolSize,且任务队列也满了的情况下,此时已超超出了线程池的负载能力,就会使用拒绝策略。jdk中内置了四种拒绝策略,这四种策略均在ThreadPoolExecutor中:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
AbortPolicy
该策略会直接抛出异常,组织系统正常工作,这是默认策略:
public class ThreadPoolExecutor extends AbstractExecutorService {
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
CallerRunsPolicy
该策略会直接在调用者线程中,运行当前被丢弃的任务,这种策略下,任务不会被真正的丢弃,但是会影响任务提交的线程性能。
DiscardOldestPolicy
该策略会丢弃最老的一个请求,也就是即将被执行的那个请求,并尝试再次提交当前任务。
DiscardPolicy
该策略默默地丢弃无法处理的任务,不予任何处理,如果不允许任务丢失,则不能使用这种策略。
自定义策略
内置策略均是实现RejectedExecutionHandler接口:
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
如果内置策略不能满足需求,可以选择通过实现RejectedExecutionHandler接口自定义策略:
public class CustomerRejectPoliceDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + " Thread ID: " + Thread.currentThread().getId());
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0l,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(),
(r, executor) -> System.out.println(r.toString() + " is discard"));
for (int i = 0; i < 30; i++) {
executorService.submit(task);
}
}
}