线程池夺命十四问

简介: 线程池夺命十四问

一:什么是线程池

线程池:顾名思义就是一个管理线程的容器,当有任务需要处理的时候,放进任务队列里面,由线程池分配空闲的线程处理任务,处理完任务的线程不会被销毁,而是在线程池中等待下一个任务

二:线程池有什么好处

1.降低资源消耗: 频繁的创建与销毁线程,占用大量资源,线程池的出现避免了这种情况,减少了资源的消耗;

2.提高响应速度: 因为线程池中的线程处于待命状态,有任务进来无需等待线程的创建就能立即执行(前提是有空闲线程,任务量巨大,还是需要排队的哈);

3.更好的管理线程: 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

三:如何创建一个线程池

在Java中有两类方法可以创建一个线程池,如下:

Executors

1.Executors.newFixedThreadPool:创建一个固定大小的线程池,并且可以控制并发线程的数量,多余的线程会放在任务队列中等待执行

2.Executors.newCacheThreadPool:创建一个可缓存的线程池,若线程数量超过了线程池设置的参数,会缓存一段时间后回收

3.Executors.newSingleThreadPool:创建单个线程的线程池,他可以保证线程的顺序执行

4.Executors.newScheduledThreadPool:创建可以执行延迟任务的线程池

5.Executors.newSingleThreadScheduleExecutors:创建一个单线程可以执行延迟任务的线程池,是3和4的结合体

6.Executors.newWorkStealingPool:创建一个抢占式的线程池(是根据CPU的核数来确定的)

ThreadPoolExecutors

7.ThreadPoolExecutors:通过手动创建线程池,需要配置参数

四:创建一个线程池为什么不推荐使用Executors

如果大家跟入到Executors这些方法的底层实现中去看一眼的话,立马就知道原因了,像FixedThreadPool 和 SingleThreadExecutor这两个方法内使用的是无界的 LinkedBlockingQueue存储任务,任务队列最大长度为 Integer.MAX_VALUE,这样可能会堆积大量的请求,从而导致 OOM。 而CachedThreadPool使用的是同步队列 SynchronousQueue, 允许创建的线程数量也为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM,其他的方法所提供的均是这种无界任务队列,在高并发场景下导致OOM的风险很大,故大部分的公司已经不建议采用Executors提供的方法创建线程池了。

五:如何设置线程池的大小

通常来说设置线程数是根据业务需求和实际情况来确定的,原因如下

1.如果当前业务需求的任务类型是多IO类型的(读和写操作比较多)话,线程数可以设置的多一些(2n-1),如果业务需求的任务类型是多cpu计算型的话,那么线程数可以设置为n-1

2.线程数还与cpu的核心数有关,因为我们创建的线程池中本质还是多个线程,而Java线程是与操作系统一一对应,因此也要考虑cpu的情况

六:线程池有哪些参数

在线程池中有七大参数

1.核心线程数:即当前线程池种常驻的线程数量,如果是0的话,则会销毁线程池

2.最大线程数:当有最多任务的时候,该线程池中所有的线程数量

3.临时线程存活时间:如果线程池空闲,除了核心线程以外的临时线程,在超过改时间后会消亡

4.临时线程存活时间单位:临时线程的存活时间单位,一般为毫秒

5.任务阻塞队列:当线程池中所有的线程都在执行任务时,又有新的任务来时,会放进任务阻塞i队列中。

6.线程工厂:是一个创建线程池的工厂,不设置该参数时,用的是默认的线程工厂

7.拒绝策略:当任务队列都已经满了的时候,此时就会触发拒绝策略。表示不在接收新的任务

七:线程池有哪些状态

通常线程池有以下5中状态

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING: 线程池一旦被创建,就处于 RUNNING 状态,任务数为 0,能够接收新任务,对已排队的任务进行处理。
  • SHUTDOWN: 不接收新任务,但能处理已排队的任务。调用线程池的 shutdown() 方法,线程池由 RUNNING 转变为 SHUTDOWN 状态。
  • STOP: 不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。调用线程池的 shutdownNow() 方法,线程池由(RUNNING 或 SHUTDOWN ) 转变为 STOP 状态。
  • TIDYING: 1)SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。

2)线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池就会由 SHUTDOWN 转变为 TIDYING 状态。

3)线程池在 STOP 状态,线程池中执行中任务为空时,就会由 STOP 转变为 TIDYING 状态

  • TERMINATED: 线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会由 TIDYING 转变为 TERMINATED 状态。

5种状态转换如下:

八:线程池的执行流程

1、刚new出来的线程池里默认是没有线程的,只有一个传入的阻塞队列;

2、当我们执行execute提交一个方法后,会判断当前线程池中线程数是否小于核心线程数(corePoolSize),如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务;

3、如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。

4、如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。

5、如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,调用RejectedExecutionHandler.rejectedExecution()方法。

源码如下:

public void execute(Runnable command) {
    // 首先检查提交的任务是否为null,是的话则抛出NullPointerException。
    if (command == null)
        throw new NullPointerException();
 
    // 获取线程池的当前状态(ctl是一个AtomicInteger,其中包含了线程池状态和工作线程数)
    //private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    int c = ctl.get();
 
    // 1. 检查当前运行的工作线程数是否少于核心线程数(corePoolSize)
    if (workerCountOf(c) < corePoolSize) {
        // 如果少于核心线程数,尝试添加一个新的工作线程来执行提交的任务
        // addWorker方法会检查线程池状态和工作线程数,并决定是否真的添加新线程
        if (addWorker(command, true))
            return;
        // 重新获取线程池的状态,因为在尝试添加线程的过程中线程池的状态可能已经发生变化
        c = ctl.get();
    }
 
    // 2. 尝试将任务添加到任务队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查线程池的状态
        if (! isRunning(recheck) && remove(command))  // 如果线程池已经停止,从队列中移除任务
            reject(command);
        // 如果线程池正在运行,但是工作线程数为0,尝试添加一个新的工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 如果任务队列满了,尝试添加一个新的非核心工作线程来执行任务
    else if (!addWorker(command, false))
        // 如果无法添加新的工作线程(可能因为线程池已经停止或者达到最大线程数限制),则拒绝任务
        reject(command);
}

九:如何判断一个线程池的中的任务是否执行完毕

在Java中通常有两种方式判断线程池中线程是否已经执行完毕

1.使用getTakCount总任务数和getCompletedTaskCount已经执行完毕的线程数相比较来判断,该方法缺点是在实际开发中线程池是公用的,因此总任务数是在一直不断变化的,并且得到的也是一个近似值,因此不推荐使用

2.使用FutureTask中的get()方法是等待所有线程执行完毕后才返回结果。

十:线程池的拒绝策略有哪些

在Java中有两类拒绝策略:内置的拒绝策略,自定义拒绝策略

1.在内置的拒绝策略中有四种:

1.AbortPolicy:即默认的拒绝策略,只会抛出一个异常消息

2.CallerRunsPolicy:即哪个线程提交过来的任务,就返回给哪个线程,是不会用线程池中的线程处理

3.DiscardPolicy:即默默丢弃任务,不会做任何的提醒操作

4.DiscardOldestPolicy:即丢弃任务队列中最旧的一个任务,并尝试重新提交任务

自定义拒绝策略

 

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * @author: dlwlrma
 * @data 2024年09月17日 19:47
 * @Description: TODO:模拟自定义线程池拒绝策略
 */
public class ThreadDemo {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
 
            @Override
            public void run() {
                System.out.println("线程名称:"+Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
 
        //创建线程,线程的任务队列长度为1
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //自定义线程池拒绝策略
                System.out.println("执行自定义拒绝策略");
            }
        });
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
}
}

实际开发中,我们不会使用内置的拒绝策略,是因为当内置的拒绝策略过于简单,不利于维护,因此常用自定义的拒绝策略,例如在自定义的拒绝策略中加入mq,当触发了拒绝策略时,发布者会收集异常信息,然后订阅者会根据异常id去处理异常等等

十一:线程池的线程是如何复用的

线程池的核心功能就是实现线程的重复利用,那么线程池是如何实现线程的复用呢? 线程池通过addWorker()方法添加任务,而在这个方法的底层会将任务和线程一起封装到一个Worker对象中,Worker 继承了 AQS,也就是具有一定锁的特性。

然后Worker中有一个run方法,执行时会去调用runWorker()方法来执行任务,我们来看一下这个方法的源码:

final void runWorker(Worker w) {
    // 获取当前工作线程
    Thread wt = Thread.currentThread();
    
    // 从 Worker 中取出第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    
    // 解锁 Worker(允许中断)
    w.unlock(); 
    
    boolean completedAbruptly = true;
    try {
        // 当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行
        while (task != null || (task = getTask()) != null) {
            // 锁定 Worker,确保在执行任务期间不会被其他线程干扰
            w.lock();
            
            // 如果线程池正在停止,并确保线程已经中断
            // 如果线程没有中断并且线程池已经达到停止状态,中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            
            try {
                // 在执行任务之前,可以插入一些自定义的操作
                beforeExecute(wt, task);
                
                Throwable thrown = null;
                try {
                    // 实际执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后,可以插入一些自定义的操作
                    afterExecute(task, thrown);
                }
            } finally {
                // 清空任务,并更新完成任务的计数
                task = null;
                w.completedTasks++;
                // 解锁 Worker
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 工作线程退出的后续处理
        processWorkerExit(w, completedAbruptly);
    }
}

在这段源码中我们看到了线程被复用的原因了,就是这个while的循环,当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行;如果从 getTask 获取不到方法的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。

十二:线程池中的阻塞队列有哪些

常见的阻塞队列有以下几种

// 1、无界队列 LinkedBlockingQueue,容量Integer.MAX_VALUE
public static ExecutorService newFixedThreadPool(int nThreads) {
 
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
 
}
 
// 1、无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
 
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
 
}
 
// 2、同步队列 SynchronousQueue,没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务,因此线程最多可创建Integer.MAX_VALUE个。
public static ExecutorService newCachedThreadPool() {
 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
 
}
 
// 3、DelayedWorkQueue(延迟阻塞队列),添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

十三:线程池中的任务出现异常后,是复用还是销毁呢

这个问题我们要分两种情况去分析,第一种是通过 execute() 提交任务时,在执行过程中抛出异常,且没有在任务内被捕获,当前线程会因此终止,异常信息会记录在日志或控制台中,并且线程池会移除异常线程,重新创建一个线程补上去。

第二种通过submit()提交任务时,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。


通过submit()的底层源码发现,其实它的内部封装的是execute方法,只不过它的任务被放在了RunnableFuture对象里。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
     RunnableFuture<Void> ftask = newTaskFor(task, null);
     execute(ftask);
     return ftask;
  }

execute方法会抛出异常终止线程的,为什么submit中不会呢,那么肯定在RunnableFuture里,经过一顿跟踪发现,这个Future中实现的run方法,对异常进行了捕获,所以并不会往上抛出,也就不会移除异常线程以及新建线程了。

十四:如何关闭一个线程池

在JDK 1.8 中,线程池的停止一般使用 shutdown()、shutdownNow()这两种方法。

1.shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
    mainLock.lock(); // 加锁以确保独占访问
 
    try {
        checkShutdownAccess(); // 检查是否有关闭的权限
        advanceRunState(SHUTDOWN); // 将执行器的状态更新为SHUTDOWN
        interruptIdleWorkers(); // 中断所有闲置的工作线程
        onShutdown(); // ScheduledThreadPoolExecutor中的挂钩方法,可供子类重写以进行额外操作
    } finally {
        mainLock.unlock(); // 无论try块如何退出都要释放锁
    }
    tryTerminate(); // 如果条件允许,尝试终止执行器
}

在shutdown的源码中,会启动一次顺序关闭,在这次关闭中,执行器不再接受新任务,但会继续处理队列中的已存在任务,当所有任务都完成后,线程池中的线程会逐渐退出。

2.shutdownNow

public List<Runnable> shutdownNow() {
    List<Runnable> tasks; // 用于存储未执行的任务的列表
    final ReentrantLock mainLock = this.mainLock; // ThreadPoolExecutor的主锁
    mainLock.lock(); // 加锁以确保独占访问
    try {
        checkShutdownAccess(); // 检查是否有关闭的权限
        advanceRunState(STOP); // 将执行器的状态更新为STOP
        interruptWorkers(); // 中断所有工作线程
        tasks = drainQueue(); // 清空队列并将结果放入任务列表中
    } finally {
        mainLock.unlock(); // 无论try块如何退出都要释放锁
    }
    tryTerminate(); // 如果条件允许,尝试终止执行器
    return tasks; // 返回队列中未被执行的任务列表
}

与shutdown不同的是shutdownNow会尝试终止所有的正在执行的任务,清空队列,停止失败会抛出异常,并且返回未被执行的任务列表。

相关文章
|
5月前
|
Java C++
关于《Java并发编程之线程池十八问》的补充内容
【6月更文挑战第6天】关于《Java并发编程之线程池十八问》的补充内容
49 5
|
5月前
|
存储 并行计算 监控
为师妹写的《Java并发编程之线程池十八问》被表扬啦!
【6月更文挑战第5天】为师妹写的《Java并发编程之线程池十八问》被表扬啦!
55 7
|
5月前
|
Java Apache Spring
面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
【6月更文挑战第3天】面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
38 0
|
6月前
|
缓存 Java 程序员
程序员的金三银四:创建线程池有哪几种方式?
程序员的金三银四:创建线程池有哪几种方式?
63 0
|
存储 缓存 Java
线程池之刨根问底
线程池之刨根问底
123 0
线程池之刨根问底
阿里巴巴面试题- - -多线程&并发篇(二十七)
阿里巴巴面试题- - -多线程&并发篇(二十七)
阿里巴巴面试题- - -多线程&并发篇(二十六)
阿里巴巴面试题- - -多线程&并发篇(二十六)
阿里巴巴面试题- - -多线程&并发篇(二十五)
阿里巴巴面试题- - -多线程&并发篇(二十五)
|
数据采集 算法 Java
库调多了,都忘了最基础的概念-《线程池篇》
库调多了,都忘了最基础的概念-《线程池篇》
128 0
库调多了,都忘了最基础的概念-《线程池篇》
|
Java 开发者
手撕源码!线程池核心组件源码剖析
看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。
164 0
手撕源码!线程池核心组件源码剖析