线程池
池化思想:线程池、数据连接池等,比如我们 Spark 的 Executor 就是典型的线程池,用户在启动 Spark 作业的同时启动线程池,这样 Spark 的 Task 就可以直接获取资源,而不用像 MR 程序那样等待容器上的进程开启了。
如果不使用线程池的话,我们需要:
- 手动创建线程对象
- 执行任务
- 执行完毕,回收资源
优点:
- 提高线程的利用率
- 提高程序的响应速度(线程对象提前创建好的,节省了创建和销毁的开销)
- 便于统一管理线程对象
- 可以控制最大的并发数
1、Java 创建线程池示例
1.1、构造参数解释
我们先看看 ThreadPoolExecutor 的构造器的源码:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); // 统一转为亚秒格式 this.threadFactory = threadFactory; this.handler = handler; }
参数大概意思就是:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列(存放任务)、线程创建工厂、拒绝策略。我们也可以通过下面的例子理解一下:
线程池就好比我们的银行,它有很多个接待客人的柜台(一个柜台就是一个线程,用来执行任务)和用来供客人等待的座位(等待的执行的任务)。上面的图中,我们有 5 个柜台(对应第2个参数:maximumPoolSize) ;而其中绿色的三个柜台代表常驻柜台(也就是一直都有人,对应第1个参数:corePoolSize);红色的柜台代表预备柜台,也就是当业务繁忙的时候,最多还可以打电话摇两个人来(maximumPoolSize - corePoolSize);但是摇人来帮忙是有时效性的,如果帮完忙一段时间(取决于第3个和第4个参数)没有活干,这些线程就会被释放;摇人的方式取决于第6个参数(下面我们是通过默认的线程工厂来再创建线程的);蓝色的等候区区域代表座位(可允许等待的任务数量),当柜台前和座位都满了的时候,如果再有任务进来就会被拒绝,具体的拒绝方式取决于第7个参数,下面我们给出的拒绝策略是直接报错:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPool { public static void main(String[] args) { // 1.核心线程数 2.最大线程数 3.存活时间 4.时间单位 5.等待队列 6.线程工厂 7.拒绝策略(直接抛异常) ExecutorService executorService = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 9; i++) { executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"正在工作"); }); } // 关闭线程池 executorService.shutdown(); } }
- 使用线程池中执行任务很简单,我们不需要关心具体是哪个线程执行的,只需要把任务丢给它即可(通过 lambda 表达式来告诉线程池我们的任务执行逻辑)
上面的案例中,我们的线程池的最大接收的任务量是 8 (最大线程数:5 + 等待队列容量:3),但并不是说只能跑8个任务,如果有任务释放资源仍然可以继续执行任务。
所以,上面的案例同一时刻最多只能共存8个任务(其中最多只有五个任务会同时执行),如果,当我们的任务超过8时,会直接报错(因为我们设置了拒绝策略就是直接抛异常)。
2、ThreadExecutorPool 源码解析
创建一个包含 10 个核心线程、总线程数为 20、存活时间为 0 s、拒绝策略为直接抛异常的一个线程池对象:
public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); executor.execute(new Runnable() { @Override public void run() { } }); }
2.1、线程池保活与回收源码分析
2.1.1、execute 方法源码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果总线程数小于核心线程数 if (addWorker(command, true)) // 添加一个核心线程 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 如果当前全部线程都在工作就把任务添加到阻塞任务队列当中 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 如果上面没有成功把任务加到队列就新加一个线程 reject(command); // 如果新加线程失败(比如超过最大线程数)就拒绝任务 }
这里的注释已经说的很明白了:
- 如果正在运行的线程数少于核心线程数(corePoolSize),则尝试启动一个新线程,并将给定的命令参数作为其第一个任务。调用addWorker方法可以原子性地检查运行状态和工作线程数,从而防止在不应该添加线程时发出错误的警报,通过返回false来实现。
- 如果任务能够成功入队,我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有线程死亡)或者线程池在进入此方法后已经关闭。所以我们重新检查状态,如果有必要回滚入队操作(如果已停止),或者如果没有线程,则启动一个新线程。
- 如果我们无法将任务入队,则尝试添加一个新的线程。如果添加失败,我们知道线程池已经关闭或饱和,因此拒绝该任务。
所以在上面我们创建线程池的代码中,并不是线程池创建好之后就会立马创建 10 个核心线程,而是真正有任务来的时候才会去新创建一个线程。
思考:如果线程的任务结束了,线程对象会怎么样呢?
从创建线程池的构造器就不难想到,构造器中有一个阻塞队列的参数,其实当线程没有任务的时候,线程并不会关闭,而是一直阻塞,也叫保活。
保活线程的关键在于阻塞队列,即LinkedBlockingDeque。当队列为空时,如果线程尝试从队列中取元素,线程会被阻塞,直到队列中有元素可供取出。这样,线程就会在等待任务的过程中保持活跃状态。
2.1.2、getTask 方法源码
getTask 方法是 runWorker 方法里的调用的,而 runWoker 又是 Worker 对象的方法,这个 Worker 实例又是上面的 execute 方法中的 addWorker 方法中实例化出来的。
getTask 方法是所有线程池中的线程一直在不断调用检查的(在 runTask 方法中的 while 循环中被调用):
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 总线程数 int wc = workerCountOf(c); // 判断是否需要超时回收线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 回收超时的线程 workQueue.take(); // 阻塞 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
方法解释:
如果满足以下任一条件,该工作线程将返回null并退出:
- 当前线程数超过了最大线程数(由于调用了setMaximumPoolSize方法)。
- 线程池已停止。
- 线程池已关闭且队列为空。
- 该工作线程在等待任务时超时,并且超时的工作线程会受到终止处理(即allowCoreThreadTimeOut || workerCount > corePoolSize),无论是在超时等待之前还是之后。如果队列非空,则该工作线程不是线程池中的最后一个线程。
返回值:
如果满足上述条件之一,返回null,表示工作线程必须退出,此时workerCount会减1。
否则,返回task,表示成功获取到任务。
从源码中可以看到,线程池中的工作线程会执行getTask()方法来获取任务。在这个方法中,线程会调用workQueue.poll()或workQueue.take()方法来尝试从LinkedBlockingDeque中获取任务。如果队列为空,这些方法会使线程阻塞,直到有新的任务添加到队列中。这就是线程在没有任务执行时仍然保持活跃(保活)的机制。
- workQueue.take():当队列为空时,该方法会无限期地等待,直到有新的任务被添加到队列中。这意味着,如果所有核心线程都在执行任务并且队列为空,那么调用 take() 方法的线程会一直阻塞,直到其他线程向队列中添加了新的任务。
- workQueue.poll():此方法在队列为空时会立即返回null,而不是等待。这通常用于非核心线程(也称为工作线程),当没有任务可做时,这些线程可以选择终止自己以减少资源占用。
思考:现在的核心线程数是 10,如果此时正在工作的线程有 11 个(10个核心线程,一个其它线程),那如果所有线程的任务都完成了,那么线程池又会执行怎样的逻辑呢?
2.1.3、runWorker 方法源码
在上面的 getTask 方法中,线程池中的每个线程在调用这个方法的时候都会判断是否需要回收线程:
// 判断是否需要超时回收线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
现在我们有 11 个线程,所以明显总线程数 wc(11) > corePoolSize(10),所以自然会执行下面的逻辑:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;
也就是 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):当前调用该方法的线程会阻塞一段时间(keppAliveTime 个单位),如果这段时间过后依然访问不到任务,那么下面的 timeOut = true ,getTask 方法返回 null。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
Java 线程池源码解析(2)https://developer.aliyun.com/article/1534180