JUC基础——线程池(2)

简介: JUC基础——线程池

三、线程池状态

每个线程池都会带有一个原子整型,用来表示自己的状态


按bit位来分,(高3位)标记线程池状态,(低29位)表示线程个数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


那么线程池总共有多少种状态呢?一共有五种,它们的变化关系如下:


98c83a2beaf04a4ea7544fae7457e693.png


  • RUNNING
  • 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,
  • 就处于RUNNING状态,并且线程池中的任务数为0!
  • SHUTDOWN
  • 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  • 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
  • STOP
  • 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
  • TIDYING
  • 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
  • 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
  • TERMINATED
  • 状态说明:线程池彻底终止,就变成TERMINATED状态。
  • 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

四、线程池的任务提交

在经过上一章的学习后,你应该已经得到了一个ExecutorService实例了,此时我们可以通过两种方式提交任务(Runnable):


tp.execute(runnable)

tp.submit(runnable) 或 tp.submit(task)


这两者我们该怎么选呢,下面我们就来详细讲一下


1. execute

9a93ee4ffe10499b8dfa37ce2cd9209d.png


execute只能用来执行runnable的实现类,而且没有返回值,事实上,这是因为run()本身就没有返回值导致的,因此这种方式,最好用来执行不需要知道结果的任务。

注意:当出现异常时,异常会被catch住,然后throw出来,本线程销毁


2. submit

29416ddf980541778bd01b6093c40d0b.png


submit其实内层调用的还是execute,此时传入execute的参数类型是RunnableFuture,同时继承Runnable 和 Future


submit可以用来提交Runnable或者Callable,Callable任务带有返回值,因此submit会有一个Future返回很合理,通过这个future.get()就能获取返回值;


但是runnable任务没有返回值,为什么也有一个Future返回呢?其实是你传的Runnable 最后还是会被封装成Callable后再执行,由于Runnable没有返回结果,所以在将Runnable包装为Callable的时候,会传入一个预期结果null,此时使用get方法返回一个null


注意:当使用的submit时,得益于FutureTask中有try-catch来存储异常,所以出现异常,FutureTask自己就消化并存起来,并可通过future.get()获取到异常,而不是直接往外抛,因此直接使用submit是不会报错的,线程池里的线程得以存活


五、线程执行异常

我们提交的任务,不总是能顺利执行,一旦出现异常,我们该怎么处理呢?

  1. 190bbebab0bd49479b4eae21e32cf499.png

  2. 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常。这种方法比较简单,也有他的局限性,不够灵活且增大代码量

  3. 使用submit提交任务,在调用future.get()方法时,会将保存的异常重新抛出

  4. 在执行任务的过程中,如果出现异常,也可以通过自己写个类,继承ThreadPoolExecutor并重写该afterExecute()方法来处理,注意,此时线程还是因异常而终止了。
  5. 48f7406a98304207b96421da3b562c33.png

  6. 当一个线程因为未捕获的异常而退出时,JVM会把这个事件报告给应用提供的UncaughtExceptionHandler异常处理器,于是就有了第三种解决任务代码抛出异常的方案:为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常。

那如何为工作者线程设置UncaughtExceptionHandler呢?ThreadPoolExecutor的构造函数提供一个ThreadFactory,可以在其中设置我们自定义的UncaughtExceptionHandler,这里不再赘述。

注意:这个方案不适用于使用submit方式提交任务的情况,原因上面也提到了,FutureTask的run方法捕获异常后保存,不再重新抛出,意味着runWorker方法并不会捕获到抛出的异常,线程也就不会退出,也不会执行我们设置的UncaughtExceptionHandler。


六、线程池执行步骤(简易)

这一章,我们会说一说当一个任务被提交进线程池,会经历什么步骤?我们可以看以下的精简步骤:


任务会优先以核心线程运行,当核心线程达到上限时,再往里面提交线程,会把线程放入队列中等待。除非队列放不下了,才会启用非核心线程来运行任务。所以不要用无界队列。如果非核心线程也满了,则执行拒绝策略

34161ba6e7814de2a0a67f243a058f5c.png



当然上面说的流程是一个大体方向,具体的细节我们只能通过源码来讲,如果你有源码恐惧症,我也给你提了一个精简版源码流程,如果你也喜欢看源码,可以看下一章的源码级流程。


  1. 我们先去创建一个Worker(内含一个线程) 并且把我们的任务传到Worker的firstTask变量里
  2. Worker创建完成以后调用runWorker方法;
  3. runWorker方法里面先把Worker自己的firstTask走完(调用runnable.run()),然后会通过getTask()方法从线程池的阻塞队列里面拿缓存的runnable
  4. 如果当前线程数超核心线程上限,getTask会以 poll(timeout, unit)取任务,一段时间取不到,就会返回null,Worker内部Thread的run方法因为没有后续任务而走完,线程生命周期结束;
  5. 如果当前线程数没有超核心线程上限,从队列拿任务时,是以take方法去拿,此时会让线程挂起,直到取到任务再返回
  6. 取到任务以后再去执行这个任务

七、线程池执行步骤(源码)

1.提交任务,判断是否新建线程执行,或者加入阻塞队列

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取当前线程池状态,包括线程数的情况    
    int c = ctl.get();
    // 当前线程数小于指定的核心线程,直接加worker核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 代表大于核心线程数,或者加worker失败,加入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 二次检查,如果当前线程池已关闭,则拒绝该任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 添加队列成功,但没有可用线程(如指定核心线程为0且当前没有非核心线程),
        // 以无命令 - 非核心形式新加worker,来执行队列里的任务   
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 代表加入阻塞队列失败,加非核心worker
    else if (!addWorker(command, false))
    // 代表加非核心worker,执行拒绝(内容由拒绝策略实现)
        reject(command);
}

2.新增worker(加任务,将任务新建线程,并启动线程来执行)

private boolean addWorker(Runnable firstTask, boolean core) {
    // retry循环,本循环主要判断线程池状态是否支持新增一个worker
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    // 这个地方直接判断添加worker失败需要满足以下条件
    // running是负数,shutdown是0,所以要求线程池的状态是大于等于shutdown,即
    // 停工、停止、整理、终结状态,但有一种情况除外,即线程池状态是SHUTDOWN 
    // 但是task是null且队列不为空时,对应的场景是addWorker(null, false)
   // 这是新建一个无命令的保底线程,执行阻塞队列里面的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 小循环,本循环主要目的为循环原子操作增加worker
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 原子性增加活动线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 原子性增加活动线程失败,查看最新的活动线程数和状态
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态变化了,跳出小循环,重新retry循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 这个是一个排他锁,只允许线程一个个排队进入执行
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 线程池是否在运行,或者是否是保底线程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                // 如果这个worker带着的Thread已经在运行了,说明有问题,抛异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3.任务执行

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 这个task(runnable)就是当初我们执行execute方法传入的参数,
    // 也就是我们要去完成的任务
    Runnable task = w.firstTask;
    // worker会被复用,它的firstTask主动设置成null,方便java垃圾回收机制回收。
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环执行,任务来自于传入的task,或队列里的task
        while (task != null || (task = getTask()) != null) {
            w.lock();
          // 双重检查,确保线程中断和线程池stop是强关联,即线程池stop时
          // 线程一定是被标记中断的,如果线程池不是stop,要把线程中断标志去掉,
          // 去掉标记后,再检查一遍线程池状态,如果此时状态变成stop了,
          // 还得把标志恢复回去
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 一个回调方法,可以继承重写在里面做一些想做的事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 待实现,异常的后置处理可以写这里
                    afterExecute(task, thrown);
                }
            } finally {
                // 将task置空,否则出不了循环
                task = null;
         // 给当前的worker标记,告诉他你又完成了一个任务,如果是通过task!=null 
         // 进来的这个completedTasks++完以后肯定是1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // worker执行完所有任务 后续处理
        processWorkerExit(w, completedAbruptly);
    }
}

4.获取其他任务 (获取阻塞队列里的任务Task)

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 线程池状态为stop,或者阻塞队列空了,减少worker,直接返回空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // 上面的if判断如果过去了,那当前状况其实只有两种
        // 一种是Running 另外一种是shutDown但是workQueue不为空
        // 这两种状况都会继续让worker干活
        int wc = workerCountOf(c);
        // worker可以过期吗? 
        // 如果线程数超核心线程限制,或核心线程也有超时限制,则返回true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 分为两部分看
        // 第一部分(wc > maximumPoolSize || (timed && timedOut)
        // 判断需不需要减少worker, 有需要则判断
        // 第二部分(wc > 1 || workQueue.isEmpty())
        // 判断有没有能力,当前条件能不能减少worker
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 是否需要过期一些线程, 如果是,则以超时限制从队列中取一个任务,
            // 如果取不到然后超时了,则返回null,取得到则返回task
            // 如果不会过期,则以take方式阻塞住,直到返回一个任务
            // 这里是线程复用或终结的关键,
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 执行到这里,说明如果有时限,worker肯定没获取到新任务而且超时了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

5.执行完task后的后续处理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)
        // 异常退出,worker数量减一
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 汇总该worker(一个worker实际上就是一个线程)完成的任务数量到线程池
        completedTaskCount += w.completedTasks;
        // 将本worker从worker集合中剔除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //退出woker的时候,检查下线程池状态
    tryTerminate();
    int c = ctl.get();
    // 线程池的状态是runing 或者 shutdown
    if (runStateLessThan(c, STOP)) {
        // 是因为没有任务了,而非线程异常,才要终止这个线程的
        if (!completedAbruptly) {
            // 保留的最小线程
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 执行到这里,说明有异常,或者当前活跃线程小于最小要求,建个保底线程
        addWorker(null, false);
    }
}

总结

上面我们已经非常详细的讲解了线程池的方方面面,对于线程的使用,注意事项乃至原理,应该都有相当深刻的了解了,如果你有什么补充和意见,也欢迎评论区留下你的想法


目录
相关文章
|
8月前
|
存储 Java 数据安全/隐私保护
【JUC】ThreadLocal 如何实现数据的线程隔离?
【1月更文挑战第15天】【JUC】ThreadLocal 如何实现数据的线程隔离?ThreadLocal 导致内存泄漏问题?
|
8月前
|
安全 算法 Java
剑指JUC原理-19.线程安全集合(上)
剑指JUC原理-19.线程安全集合
57 0
|
4月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
|
3月前
|
Java C++
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
40 0
|
4月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
220 6
|
5月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
5月前
|
设计模式 Java 调度
JUC线程池: ScheduledThreadPoolExecutor详解
`ScheduledThreadPoolExecutor`是Java标准库提供的一个强大的定时任务调度工具,它让并发编程中的任务调度变得简单而可靠。这个类的设计兼顾了灵活性与功能性,使其成为实现复杂定时任务逻辑的理想选择。不过,使用时仍需留意任务的执行时间以及系统的实际响应能力,以避免潜在的调度问题影响应用程序的行为。
96 1
|
5月前
|
Java API 调度
JUC线程池: FutureTask详解
总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
47 0
|
5月前
|
Java 程序员 容器
【多线程面试题二十四】、 说说你对JUC的了解
这篇文章介绍了Java并发包java.util.concurrent(简称JUC),它是JSR 166规范的实现,提供了并发编程所需的基础组件,包括原子更新类、锁与条件变量、线程池、阻塞队列、并发容器和同步器等多种工具。
|
7月前
|
存储 安全 Java
Java多线程编程--JUC
Java多线程编程