深入理解ThreadPoolExecutor

简介: 深入理解ThreadPoolExecutor

1.简单了解ThreadPoolExecutor


1.1 从构造函数开始了解



public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler)

下面我们将解释这个构造函数的七个形参


corePoolSize 表示线程池的核心线程数


maximumPoolSize 表示线程池的最大线程数


keepAliveTime 表示某个线程在多长时间内没有获取到任务时结束线程


unit 表示keepAliveTime的时间单位,会转化成纳秒


workQueue 表示存放Runnable的任务队列。队列分为有界队列和无界队列,不同类型的队列会影响到maximumPoolSize是否起作用


threadFactory 表示生成线程池中工作线程的线程工厂


handler 表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略


1.2举例说明

//1 第一步创建线程池执行器
    ThreadPoolExecutor threadPoolExecutor =  
            new ThreadPoolExecutor(
                    3,//corePoolSize为3
                    5,//maximumPoolSize为5
                    60,//超时时间为60s
                    TimeUnit.SECONDS, 
                    //使用有界队列,队列大小为10  
                    new ArrayBlockingQueue<Runnable>(10),
                    //使用默认的线程工厂
                    Executors.defaultThreadFactory(),
                //当任务队列已满而且线程超过最大线程数,任务的拒绝策略
                    new ThreadPoolExecutor.AbortPolicy());
    //2.第二步创建任务。假设每个任务都是睡眠10s钟
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //3.第三步,线程池执行任务
      threadPoolExecutor.execute(runnable)

1.3任务执行的流程

1.3.1执行流程描述和示例图

任务执行的流程主要分为三大步


1. 执行一个任务时。首先判断当前线程池中的线程数是否小于corePoolSize。如果是则创建新的工作线程并执行任务。如果不是或工作线程创建失败则执行步骤2。


2. 判断任务队列是否填满了任务。如果没有则将任务放进任务队列中等待线程执行。如果满了则执行步骤3。


3. 判断当前线程池中的线程数是否小于maximumPoolSize,如果等于maximumPoolSize则拒绝该任务的执行。如果不是则创建新的工作线程来执行该任务。如果创建工作线程失败,也拒绝该任务


示例图如下

20160716000225017.png


1.3.2举例说明线程池的执行过程


从创建threadPoolExecutor的时候计时t=0s corePoolSize=3 maximumPoolSize=5 任务队列大小为10 每个任务执行10s


1. t=0s执行threadPoolExecutor.execute(runnable),此时线程池中的线程为0,0小于corePoolSize 3,则创建一个工作线程名为pool-1-thread-1执行runnable。示例图如下

20160716010929044.png2. t=1s时执行threadPoolExecutor.execute(runnable),由于t=0时已经创建了pool-1-thread-1的线程,线程池中的线程数为1,1小于corePoolSize,则创建名为pool-1-thread-2执行runnable。示例图如下

20160716011831111.png


3. t=2s时分别执行 threadPoolExecutor.execute(runnable3)和

threadPoolExecutor.execute(runnable4)。执行runnable3时由于线程池线程个数为2小于corePoolSize,会创建pool-1-thread-2线程来执行runnable3.当执行runnable4的时候由于线程池个数已经大于等于corePoolSize了。所以不会再创建新的工作线程,而是把runnable4放入任务队列中。示例图如下

20160716012830870.png

4. t=3s时分别执行任务runnable4—runnable13。由于线程池线程数量已经等于corePoolSize,而任务队列没有满,所以把runnable放入到任务队列中。示例图如下

20160716014124340.png

5. t=4s时执行runnable14。由于线程池中的线程执行的任务的时间为10s,而此刻才过去了4s。所以线程池中的线程都在忙,而任务队列都已经满了。此刻只能在保证线程池中的总线程数不超过maximumPoolSize也就是5的情况下再创建新的线程pool-1-thread-4去执行runnable14。示例图如下

20160716014628608.png

6. t=5s时执行runnable15。由于线程池中的线程执行的任务的时间为10s,而此刻才过去了5s。所以线程池中的线程都在忙,而任务队列都已经满了。此刻只能在保证线程池中的总线程数不超过maximumPoolSize也就是5的情况下再创建新的线程pool-1-thread-5去执行runnable15。示例图如下

20160716014930765.png


7. t=6s时执行runnable16。由于此刻线程池中的线程已经等于最大线程数,任务队列也已经装满了任务。那么runnable16将被相应的拒绝策略来拒绝。默认的是ThreadPoolExecutor.AbortPolicy。表示runnable16将被抛弃。示例图如下

20160716015423096.png


8.t=11s时,由于pool-1-thread-1和pool-1-thread-2已经将runnable执行完毕。它们将去任务队列中取任务(workQueue.take()方法).示例图如下

20160716020547945.png


9.t=30s时,线程池中线程还是5个。任务队列中已经没有任务了。线程池中的线程都在等待往任务队列加入任务


2. 任务如何被执行


当一个Runnable被ThreadPoolExecutor执行时,这个任务是如何被执行的呢?这里分两种情况,第一种情况,当任务被线程池执行的时候,线程池中的线程池数量满足创建新的工作线程,则会创建一个新的工作线程直接执行该任务。第二种情况,当线程池没有条件创建新的工作线程,则会将该任务放入任务队列中,等线程池中的其他工作线程有空闲的时候从任务队列中取出任务并执行。


2.1如何创建一个工作线程


ThreadPoolExecutor通过调用addWorker(Runnable firstTask, boolean core) 方法来创建工作线程。我们先来看一下Worker的类定义

20160716103105090.pngWorker类会持有一个线程thread,

thread初始化方式为this.thread = getThreadFactory().newThread(this);

当thread调用start()方法就会调用Woker的run()方法。


那么要想知道工作线程是如何创建并运行的,以及运行后干了什么事情,就必须要分别知道new Worker(),thread.start()分别是在什么时候调用的。经过查找发现在addWorker方法中创建工作线程并启动线程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                //label 1
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //label2
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们来解析该源码


形参firstTask表示要执行的任务,core为true表示如果当前线程小于corePoolSize则创建新的线程,如果大于等于corePoolSize则创建失败,core为false表示如果当前线程小于maximumPoolSize则创建新的线程,如果大于等于maximumPoolSize则创建失败。


创建工作线程是可能在多线程的情况下执行。所以需要用CAS算法,保证当前没有其它的线程创建了工作线程。如果有则会一直循环直到当前线程创建成功了工作线程,线程池工作线程数量加1。然后紧接着判断当前工作线程池中的数量是否超过相应的数量限制。如果超过了限制,则直接return。上述代码的//label1处 。并将线程池线程数量减1


如果工作线程创建成功,需要将该工作线程加入到Hashset中,由于HashSet并不是线程安全的。所以需要在该代码块(//label2)处使用同步锁。然后start线程。


至此我们已经明白,工作线程是怎样创建并运行的。接下来我们来看,工作线程运行后都干了些什么。代码在runWorker(Worker w)中

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
 private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

我们来解析该源码


1.线程启动后我们主要是通过一个while循环来让线程一直工作。通过getTask()向任务队列取任务。注意getTask()可以是一个阻塞方法,也可以是一个超时阻塞方法。什么意思?如果是一个阻塞方法,那么如果任务队列没有工作任务,那么该方法会一直等待,直到取出工作任务。如果是一个超时等待的阻塞方法,如果在超时时间内,任务队列还是没有工作任务,那么getTask()将会返回null,那么该工作线程也就跳出了while循环。结束了该工作线程。keepAliveTime的实现机制便是使用了超时等待的阻塞方法


2. 超时等待,是如何实现将线程池中大于corePoolSize的线程结束掉


考虑这样一种情况,假设corePoolSize为10,maximumPoolSize为20,任务队列大小为80。可能在某一个时刻我们让线程池执行100个持续时间1-2分钟的任务。那么此时线程池中的工作线程个数应该为20。任务队列也都被填满了。当所有的工作任务都被执行完。那么此时线程池中的工作线程个数依然为20,工作队列中任务数为0。那么此时线程池中20个工作线程将会显的比较浪费,我们需要终止掉一些多余的工作线程。我们有没有办法能够使得当corePoolSize个线程和工作队列不够用的时候动态扩大线程池的工作线程个数。而当工作任务比较少时,将线程池中的工作线程数缩小到corePoolSize呢。答案是肯定的。主要是通过超时阻塞获取任务队列的任务。如果超时了,则返回null。从而让工作线程跳出循环,达到结束线程的目的



相关文章
|
6月前
|
监控 Java 调度
Java线程池ThreadPoolExecutor初略探索
Java线程池ThreadPoolExecutor初略探索
|
4月前
|
监控 Java
ThreadPoolExecutor 介绍
ThreadPoolExecutor 介绍
45 0
|
4月前
|
Java
ThreadPoolExecutor 使用
ThreadPoolExecutor 使用
35 0
|
6月前
|
Java
线程池ThreadPoolExecutor总结
线程池ThreadPoolExecutor总结
|
7月前
|
缓存 搜索推荐 Java
线程池之ThreadPoolExecutor
线程池之ThreadPoolExecutor
75 0
|
机器学习/深度学习 消息中间件 存储
ThreadPoolExecutor解读
ThreadPoolExecutor解读
|
存储 缓存 监控
ThreadPoolExecutor:线程池不允许使用Executors创建
ThreadPoolExecutor:线程池不允许使用Executors创建
384 0
ThreadPoolExecutor:线程池不允许使用Executors创建
|
存储 缓存 监控
线程池 ThreadPoolExecutor 详解
对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破坏了数据的局部性。因此在并发编程中,当线程创建过多时,会影响程序性能,甚至引起程序崩溃。 而线程池属于池化管理模式,具有以下优点: 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的性能消耗。 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性:能够对线程进行统一分配、调优和监控。
220 0
|
网络协议 Java
Java并发:线程池详解(ThreadPoolExecutor)
Java并发:线程池详解(ThreadPoolExecutor)
173 0
线程池之ThreadPoolExecutor使用
通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景