深入理解ThreadPoolExecutor

简介: 在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。

在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。

队列

参数 workQueue指被提交但未执行的任务队列, 它是一个 BlockingQueue接口的对象,仅用于存放 Runnable对象。 根据队列功能分类, 在ThreadPoolExecutor的构造函数中可使用以下几种 BlockingQueue。

  • 直接提交的队列:该功能由 SynchronousQueue对象提供。 SynchronousQueue是一个特殊的 BlockingQueue。 SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作, 反之, 每一个删除操作都要等待对应的插入操作。 如果使用synchronousQueue, 提交的任务不会被真实的保存, 而总是将新任务提交给线程执行,如果没有空闲的进程, 则尝试创建新的进程, 如果进程数量已经达到最大值, 则执行拒绝策略。因此,使用 SynchronousQueue队列,通常要设置很大的 maximumPoolSize否则很容易执行拒绝策略。
  • 有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue实现。 当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示public ArrayBlockingQueue(int capacity);当使用有界的任务队列时, 若有新的任务需要执行, 如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程;若大于corePoolSize,则会将新任务加入等待队列;若等待队列已满,无法加入,则在总线程数不大于 maximumPoolSize的前提下,创建新的进程执行任;若大于 maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙, 否则确保核心线程数维持在 corePoolSize。
  • 无界的任务队列:无界任务队列可以通过 LinkedBlockingQueue类实现。与有界队列相比, 除非系统资源耗尽, 否则无界的任务队列不存在任务入队失败的情况 。 当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源, 则任务直接进入队列等待。 若任务创建和处理的速度差异很大, 无界队列会保持快速増长, 直到耗尽系统内存。
  • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue实现, 可以控制任务的执行先后顺序 。 它是一个特殊的无界队列 。 无论是有界队ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue都是按照先进先出算法处理任务的 。 而 PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行, 在确保系统性能的同时, 也能有很好的质量保证 (总是确保高优先级的任务先执行)。

现在再看下newFixedThreadPool()方法的实现。它返回了一个 corePoolSize和 maximumPoolSize大小一样的,并且使用了 LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务, 当任务提交非常频繁的时候, 该队列可能迅速膨胀,从而耗尽系统资源。

而newSingleThreadExecutor()返回的单线程线程池, 是 newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1 。

newCachedThreadPool()方法返回 corePoolSize为0, maximumPoolSize无穷大的线程池,这意味着在没有任务时, 该线程池内无线程, 而当任务被提交时, 该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入 SynchronousQueue队列,而 SynchronousQueue队列是一种直接提交的队列, 它总会追使线程池增加新的线程执行任务 。 当任务执行完毕后, 由于corePoolSize为 0, 因此空闲线程又会在指定时间内(60秒)被回收。

拒绝策略

ThreadPoolExecutor的最后一个参数handler指定了拒绝策略 。也就是当任务数量超过系统实际承载能力时, 该如何处理呢?这时就要用到拒绝策略了 。拒绝策略可以说是系统超负荷运行时的补救措施, 通常由于压力太大而引起的, 也就是线程池中的线程已经用完了, 无法继续为新任务服务, 同时, 等待队列中也已经排满了, 再也塞不下新任务了 。这时我们就需要有一套机制, 合理地处理这个问题。

JDK内置提供了四种拒绝策略:

  • AbortPolicy策略:该策略会直接抛出运行时RejectedExecutionException异常,阻止系统正常工作。
  • CauerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务, 并尝试再次提交当前任务。
  • DiscardPolicy策略:该策略将丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种方案

以上内置的策略均实现了 RejectedExecutionHandler接口, 若以上策略仍无法满足实际应用需要,完全可以自己扩展 RejectedExecutionHandler接口 ,RejectedExecutionHandler的定义如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

下面列举一个代码简单说明一下

/**
 * 自定义线程池和拒绝策略
 * Created by xmr on 2018/9/5.
 */
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {

        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" +
                    Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /*自定义线程池和拒绝策略*/
    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),//默认创建线程的实现方式,可以省略
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

扩展线程池

虽然 JDK已经帮我们实现了这个稳定的高性能线程池 。 但如果我们需要对这个线程池做一些扩展, JDK还是给了我们支持了,它提供了 beforeExecute()、afterExecute()和 terminated()三个接口对线程池进行控制 。

在默认的ThreadPoolExecutor实现中,提供了空的 beforeExecute0和 aferExecute0实现。在实际应用中, 可以对其进行扩展来实现对线程池运行状态的跟踪, 输出一些有用的调试信息, 以帮助系统故障诊断, 这对于多线程程序错误排査是很有帮助的 。 下面演示了对线程池的扩展

/**
 * 扩展线程池
 * Created by xmr on 2018/9/5.
 */
public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        public void run() {
            System.out.println("正在执行 : Thread ID:" +
                    Thread.currentThread().getId() + ",Task name= " + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行: " + ((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成: " + ((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

 

参考:

https://blog.csdn.net/linghu_java/article/details/17123057

相关文章
|
5月前
|
监控 Java 调度
Java线程池ThreadPoolExecutor初略探索
Java线程池ThreadPoolExecutor初略探索
|
3月前
|
监控 Java
ThreadPoolExecutor 介绍
ThreadPoolExecutor 介绍
33 0
|
3月前
|
Java
ThreadPoolExecutor 使用
ThreadPoolExecutor 使用
28 0
|
5月前
|
Java
线程池ThreadPoolExecutor总结
线程池ThreadPoolExecutor总结
|
6月前
|
缓存 搜索推荐 Java
线程池之ThreadPoolExecutor
线程池之ThreadPoolExecutor
65 0
|
机器学习/深度学习 消息中间件 存储
ThreadPoolExecutor解读
ThreadPoolExecutor解读
|
算法 安全 Java
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
|
存储 缓存 监控
ThreadPoolExecutor:线程池不允许使用Executors创建
ThreadPoolExecutor:线程池不允许使用Executors创建
376 0
ThreadPoolExecutor:线程池不允许使用Executors创建
|
存储 缓存 监控
线程池 ThreadPoolExecutor 详解
对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破坏了数据的局部性。因此在并发编程中,当线程创建过多时,会影响程序性能,甚至引起程序崩溃。 而线程池属于池化管理模式,具有以下优点: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的性能消耗。 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性:能够对线程进行统一分配、调优和监控。
212 0
线程池之ThreadPoolExecutor使用
通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景