在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。
队列
参数 workQueue指被提交但未执行的任务队列, 它是一个 BlockingQueue接口的对象,仅用于存放 Runnable对象。 根据队列功能分类, 在ThreadPoolExecutor的构造函数中可使用以下几种 BlockingQueue。
- 直接提交的队列:该功能由 SynchronousQueue对象提供。 SynchronousQueue是一个特殊的 BlockingQueue。 SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作, 反之, 每一个删除操作都要等待对应的插入操作。 如果使用synchronousQueue, 提交的任务不会被真实的保存, 而总是将新任务提交给线程执行,如果没有空闲的进程, 则尝试创建新的进程, 如果进程数量已经达到最大值, 则执行拒绝策略。因此,使用 SynchronousQueue队列,通常要设置很大的 maximumPoolSize否则很容易执行拒绝策略。
- 有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue实现。 当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示public ArrayBlockingQueue(int capacity);当使用有界的任务队列时, 若有新的任务需要执行, 如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程;若大于corePoolSize,则会将新任务加入等待队列;若等待队列已满,无法加入,则在总线程数不大于 maximumPoolSize的前提下,创建新的进程执行任;若大于 maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙, 否则确保核心线程数维持在 corePoolSize。
- 无界的任务队列:无界任务队列可以通过 LinkedBlockingQueue类实现。与有界队列相比, 除非系统资源耗尽, 否则无界的任务队列不存在任务入队失败的情况 。 当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源, 则任务直接进入队列等待。 若任务创建和处理的速度差异很大, 无界队列会保持快速増长, 直到耗尽系统内存。
- 优先任务队列:优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue实现, 可以控制任务的执行先后顺序 。 它是一个特殊的无界队列 。 无论是有界队ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue都是按照先进先出算法处理任务的 。 而 PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行, 在确保系统性能的同时, 也能有很好的质量保证 (总是确保高优先级的任务先执行)。
现在再看下newFixedThreadPool()方法的实现。它返回了一个 corePoolSize和 maximumPoolSize大小一样的,并且使用了 LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务, 当任务提交非常频繁的时候, 该队列可能迅速膨胀,从而耗尽系统资源。
而newSingleThreadExecutor()返回的单线程线程池, 是 newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1 。
newCachedThreadPool()方法返回 corePoolSize为0, maximumPoolSize无穷大的线程池,这意味着在没有任务时, 该线程池内无线程, 而当任务被提交时, 该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入 SynchronousQueue队列,而 SynchronousQueue队列是一种直接提交的队列, 它总会追使线程池增加新的线程执行任务 。 当任务执行完毕后, 由于corePoolSize为 0, 因此空闲线程又会在指定时间内(60秒)被回收。
拒绝策略
ThreadPoolExecutor的最后一个参数handler指定了拒绝策略 。也就是当任务数量超过系统实际承载能力时, 该如何处理呢?这时就要用到拒绝策略了 。拒绝策略可以说是系统超负荷运行时的补救措施, 通常由于压力太大而引起的, 也就是线程池中的线程已经用完了, 无法继续为新任务服务, 同时, 等待队列中也已经排满了, 再也塞不下新任务了 。这时我们就需要有一套机制, 合理地处理这个问题。
JDK内置提供了四种拒绝策略:
- AbortPolicy策略:该策略会直接抛出运行时RejectedExecutionException异常,阻止系统正常工作。
- CauerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
- DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务, 并尝试再次提交当前任务。
- DiscardPolicy策略:该策略将丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种方案
以上内置的策略均实现了 RejectedExecutionHandler接口, 若以上策略仍无法满足实际应用需要,完全可以自己扩展 RejectedExecutionHandler接口 ,RejectedExecutionHandler的定义如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
下面列举一个代码简单说明一下
/**
* 自定义线程池和拒绝策略
* Created by xmr on 2018/9/5.
*/
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable {
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" +
Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*自定义线程池和拒绝策略*/
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),//默认创建线程的实现方式,可以省略
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is discard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i++) {
es.submit(task);
Thread.sleep(10);
}
}
}
扩展线程池
虽然 JDK已经帮我们实现了这个稳定的高性能线程池 。 但如果我们需要对这个线程池做一些扩展, JDK还是给了我们支持了,它提供了 beforeExecute()、afterExecute()和 terminated()三个接口对线程池进行控制 。
在默认的ThreadPoolExecutor实现中,提供了空的 beforeExecute0和 aferExecute0实现。在实际应用中, 可以对其进行扩展来实现对线程池运行状态的跟踪, 输出一些有用的调试信息, 以帮助系统故障诊断, 这对于多线程程序错误排査是很有帮助的 。 下面演示了对线程池的扩展
/**
* 扩展线程池
* Created by xmr on 2018/9/5.
*/
public class ExtThreadPool {
public static class MyTask implements Runnable {
public String name;
public MyTask(String name) {
this.name = name;
}
public void run() {
System.out.println("正在执行 : Thread ID:" +
Thread.currentThread().getId() + ",Task name= " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行: " + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成: " + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i < 5; i++) {
MyTask task = new MyTask("TASK-" + i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
参考: