每日一博 - Review线程池(下)

简介: 每日一博 - Review线程池(下)

任务执行机制

任务调度


任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。这就是线程池的核心运行机制。


首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。


首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。


如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。


如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。


如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。


如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。


流程图如下


d14f656c459741bd9fc991e0a4a6c2ea.png


67c7f5d9956c488090ffc65908fc0c7d.png


任务缓冲


任务缓冲模块是线程池能够管理任务的核心部分。


线程池的本质是对任务和线程的管理,而做到这一点关键在于将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。


线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。


阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。


在队列为空时,获取元素的线程会等待队列变为非空。

当队列满时,存储元素的线程会等待队列可用。


阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

b1030de8b8c24e2d9aafd73438308edd.png

【线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素】


c07aac87af594b6a9b7dd32090bd6002.png


使用不同的队列可以实现不一样的任务存取策略。


34e9b92a4d69428db52e2d8931500a63.png


任务申请


任务的执行有两种可能:

  • 一种是任务直接由新创建的线程执行。
  • 另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。

第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。


线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现。

  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }



getTask 进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。


d05f8290e6084447a586d1b4e99d9557.png


任务拒绝


任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

3de481f380b345a185afb468c3f86caa.png


可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略


478733a1a23e4edc8f79d9e5ea171278.png


Worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker


3974f28f27a34aeeaf7ae2b8b2125f2c.png


Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。


thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;

firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。


如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。


97c65be7c01645aa9a718309390a85ee.png


线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。


Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。


1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。

2.如果正在执行任务,则不应该中断线程。

3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。


在线程回收过程中就使用到了这种特性,回收过程如下图所示:

87b3132b7e644aad86a0e9e5b160816f.png

增加Worker线程


增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。


addWorker方法有两个参数:firstTask、core。


  • firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;
  • core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize 。


其执行流程如下图所示:



71875873c25c43a3b029b60a9d8048cb.png


  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


回收Worker线程


线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。


Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。


当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。


07265abb1adb4b16b97cbeb534d98d89.png

20e2cb04431342e89afd5a35eeb00a3a.png

线程回收的工作是在processWorkerExit方法完成的。


ef92ae328810495b9e8f6b8bf1624173.png


事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。


Worker线程执行任务


在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:


bdd872803d9e41daa559d189a97fb2e5.png

8efd12daa8f44013a155f3767c1ea8d1.png


1.while循环不断地通过getTask()方法获取任务。

2.getTask()方法从阻塞队列中取任务。

3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。

4.执行任务。

5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。


c9874d060be646cda7c8bfda6671df6b.png


aa2d2825e36d433c87b9d842b00e3887.png


参考: Java线程池实现原理及其在美团业务中的实践

相关文章
|
存储 缓存 Java
每日一博 - Review线程池
每日一博 - Review线程池
127 0
|
安全 Java API
Java Review - SimpleDateFormat线程不安全原因的源码分析及解决办法
Java Review - SimpleDateFormat线程不安全原因的源码分析及解决办法
365 0
|
Java C++
Java Review - 线程池中使用ThreadLocal不当导致的内存泄漏案例&源码分析
Java Review - 线程池中使用ThreadLocal不当导致的内存泄漏案例&源码分析
294 0
|
Java
Java Review - 线程池使用FutureTask的小坑
Java Review - 线程池使用FutureTask的小坑
141 0
|
Java
Java Review - 线程池资源一直不被释放案例&源码分析
Java Review - 线程池资源一直不被释放案例&源码分析
589 0
|
Java Go
Java Review - 创建线程和线程池时建议指定与业务相关的名称
Java Review - 创建线程和线程池时建议指定与业务相关的名称
158 0
|
存储 缓存 监控
每日一博 - Review线程池_02
每日一博 - Review线程池_02
188 0
|
存储 Java 调度
每日一博 - Review线程池(上)
每日一博 - Review线程池(上)
197 0
|
缓存 并行计算 安全
Java Review(三十七、多线程)
Java Review(三十七、多线程)
256 0
Java Review(三十七、多线程)
|
2月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
152 6

热门文章

最新文章