Executors 正确使用的姿势是怎么样?

简介: Executors 正确使用的姿势是怎么样?

在程序开发过程中,有时要使用到线程池来创建线程,来执行异步的任务,但由于对线程池的不了解,可能会埋下地雷,那怎么使用线程池才是正确的呢?

在《阿里巴巴Java开发手册》中有作出明确的描述:“线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。”。

先来看看一段简单的测试代码:

public class ThreadDemo {
   
   
  public static void main(String[] args) {
   
   
    ExecutorService es = Executors.newSingleThreadExecutor();
  }
}

大家觉得上面这段代码会有什么问题吗?如果不清楚,我们可以使用阿里巴巴公司提供的P3C检查代码插件(有eclipse、idea,感兴趣的小伙伴可以自行安装),来看看P3C检查的结果:

image.png

P3C检查不通过,提示线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。(PS:很难得在编译器中看到中文提示,对于英语不好的同学来说,简直是福音,喜极而泣!!!!)

那么,为什么不允许使用Executors.newSingleThreadExecutor()方式来创建线程呢?

打开程序,找到Executors类,我们可以看到,Executors类提供了多种创建线程池的方法,经常使用的工厂方法有:

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

来看看这几个创建线程的静态方法:newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()。

public static ExecutorService newSingleThreadExecutor() {
   
   
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}


public static ExecutorService newFixedThreadPool(int nThreads) {
   
   
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool() {
   
   
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

从上面的源码,可以看到newSingleThreadExecutor是通过FinalizableDelegatedExecutorService的代理模式来创建,从ThreadPoolExecutor的构造方法的参数可以看出,是创建一个只有一个运行线程的线程池,那这样有什么问题吗?来看看ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
   
   
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
}

ThreadPoolExecutor的构造方法的参数很多,它们的含义是:

  • corePoolSize:线程池中的线程数量;

  • maximumPoolSize:线程池中的最大线程数量;

  • keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程会在多长时间内被销毁;

  • unit:keepAliveTime的时间单位;

  • workQueue:任务队列,被提交但是尚未被执行的任务;

  • threadFactory:线程工厂,用于创建线程,一般情况下使用默认的,即Executors类的静态方法defaultThreadFactory();

  • handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。

这⾥有两个参数需要注意一下:corePoolSize和maximumPoolSize。看⼀下这两个参数的说明:

*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {
   
   @code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool

意思⼤概是说corePoolSize是线程池中的最⼤线程数量,⽽maxmumPoolSize意思是阻塞队列最⼤的阻塞线程数量,然后我们再回去看⼀下Executors.newSingleThreadService⽅法,给出的这两个参数都是1,那么问题来了,如果我想要搞⼀个只有⼀个运⾏线程的线程池时这个⽅法就不能满⾜我们的需求了,那么直接new⼀个ThreadPoolExecutor如:new ThreadPoolExecutor(1, 0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())可不可以呢?答案是:不可以,因为在ThreadPoolExecutor的构造函数⾥的第⼀个if语句就判断了,如果maximumPoolSize <= 0就会抛出⼀个异常。再回来看看,如果我们使⽤了JDK⾃带的newSingleThreadPool会发⽣什么情况:

  • 第⼀次submit⼀个线程,因为没有在运⾏的线程,所以该线程不会被提交到阻塞队列⾥,⽽是直接start运⾏。
  • 紧接着我们再submit⼀个线程,这次因为已经有⼀个线程在运⾏了,并且运⾏的线程数量等于corePoolSize所以,这个线程会被提交到阻塞队列。
  • 再submit第三个线程,这次就是饱和策略⼤显⾝⼿的时候了。

What,我原本是不想让第⼆个线程提交的,这样不就算是重复提交了吗?对,是重复提交。所以看来⾃带的JDK给的线程池是不能满⾜需求了。下⾯是ThreadPoolExecutor.execute⽅法的部分源码,可以解释上⾯的分析结果:

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
   
   
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
   
   
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}

既然newSingleThreadExecutor有这么多问题,难怪阿里爸爸会建议通过ThreadPoolExecutor的方式来实现,Executors类中的静态方法也是用它,只不过帮我们配了一些参数而已。

前面已经了解了ThreadPoolExecutor构造方法的参数及说明,那这些参数之间有什么关系呢?
1)corePoolSize与maximumPoolSize的关系

  • corePoolSize肯定是 <= maximumPoolSize。

  • 若当前线程池中线程数 < corePoolSize,则每来一个任务就创建一个线程去执行;

  • 若当前线程池中线程数 >= corePoolSize,会尝试将任务添加到任务队列。如果添加成功,则任务会等待空闲线程将其取出并执行;

  • 若队列已满,且当前线程池中线程数 < maximumPoolSize,创建新的线程;

  • 若当前线程池中线程数 >= maximumPoolSize,则会采用拒绝策略(JDK提供了四种,下面会介绍到)。

注意:关系3是针对的有界队列,无界队列永远都不会满,所以只有前2种关系。

2)workQueue

参数workQueue是指提交但未执行的任务队列。若当前线程池中线程数>=corePoolSize时,就会尝试将任务添加到任务队列中。主要有以下几种:

  • SynchronousQueue:直接提交队列。SynchronousQueue没有容量,所以实际上提交的任务不会被添加到任务队列,总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略。

  • LinkedBlockingQueue:无界的任务队列。当有新的任务来到时,若系统的线程数小于corePoolSize,线程池会创建新的线程执行任务;当系统的线程数量等于corePoolSize后,因为是无界的任务队列,总是能成功将任务添加到任务队列中,所以线程数量不再增加。若任务创建的速度远大于任务处理的速度,无界队列会快速增长,直到内存耗尽。

3) handler

JDK内置了四种拒绝策略:

  • DiscardOldestPolicy策略:丢弃任务队列中最早添加的任务,并尝试提交当前任务;

  • CallerRunsPolicy策略:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度。详细参看这篇博文:正确理解CallerRunsPolicy()拒绝策略

  • DiscardPolicy策略:默默丢弃无法处理的任务,不予任何处理。

  • AbortPolicy策略:直接抛出异常,阻止系统正常工作。

了解了ThreadPoolExecutor相关参数说明,来使用ThreadPoolExecutor创建线程池,看看结果是什么:

public class ThreadDemo {
   
   
  public static void main(String[] args) {
   
   
    ExecutorService es = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.DiscardPolicy());
  }
}

再用P3C检查代码,终于没有报错了。

总结:

1. newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

优点:单线程的线程池,保证线程的顺序执行

缺点:不适合并发

2. newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
优点:固定大小线程池,超出的线程会在队列中等待
缺点:不支持自定义的拒绝策略,大小固定,难以扩展

3. newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
优点:很灵活,弹性的线程池线程管理,用多少线程给多大的线程池,没用后及时回收,用则新建
缺点:一旦线程无限增长,会导致内存溢出

4. newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行
优点:一个固定大小线程池,可以定时或周期性的执行任务
缺点:任务是单线程方式执行,一旦一个任务失败其他任务也受影响

结果:
以上线程池都不支持自定义拒绝策略。

  • newFixedThreadPool 和 newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
  • newCachedThreadPool 和 newScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至
    OOM(Out Of Memory)。

当然,通过ThreadPoolExecutor创建线程池

  • 优点:集上述优点于一身

  • 缺点:没发现缺点,因为上述线程池的底层就是通过它来创建的。。哈哈哈

参考资料:
1、https://blog.csdn.net/dabusiGin/article/details/105327873/

2、《阿里巴巴Java开发手册》

目录
相关文章
|
1月前
|
存储 Java 开发者
为什么建议不要使用Executors来创建线程池?
为什么建议不要使用Executors来创建线程池?
15 0
|
9月前
|
机器学习/深度学习 消息中间件 存储
ThreadPoolExecutor解读
ThreadPoolExecutor解读
|
数据采集 缓存 安全
Executors 与 Thread 比较
Executors 与 Thread 比较
75 0
|
消息中间件 Dubbo Java
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
|
算法 安全 Java
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
深入理解ThreadPoolExecutor
|
存储 缓存 监控
ThreadPoolExecutor:线程池不允许使用Executors创建
ThreadPoolExecutor:线程池不允许使用Executors创建
349 0
ThreadPoolExecutor:线程池不允许使用Executors创建
|
缓存 Java
为什么线程池不允许使用Executors去创建?
Executors Executors存在什么问题 Executors为什么会OOM 创建线程池的正确姿势
为什么线程池不允许使用Executors去创建?
|
Java
ThreadPoolExecutor详解
详细介绍了ThreadPoolExecutor线程池的各种参数和各种使用场景以及线程池中线程的创建时刻和策略
4184 0
|
Java
ThreadPoolExecutor
一 核心属性 1  int corePoolSize 指该线程池中核心线程数最大值 核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程。
4666 0
|
Java
ExecutorService
    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.
900 0