我是陈皮,一个在互联网 Coding 的 ITer,个人微信公众号「陈皮的JavaLib」关注第一时间阅读最新文章。
线程池简介
多线程并发执行可以提高程序的性能。特别是在多核处理器的环境下,多线程程序能发挥多核处理器的优势性能。
虽然与进程相比,线程轻量化很多,但其创建和关闭同样需要花费时间。而且线程多了以后,也会抢占内存资源。如果不对线程加以管理,是一个很大的隐患。而线程池的目的就是管理线程。当需要一个线程时,可以从线程池拿一个空闲线程去执行任务,当任务执行完后,线程又会归还到线程池。这样就有效的避免了重复创建、关闭线程和线程数量过多带来的问题。
Java 并发包java.util.concurrent
提供了线程池功能,以下是一些相关接口,类的关系。
线程池核心类 ThreadPoolExecutor
ThreadPoolExecutor 是核心类,我们一般使用这个类的对象来提交任务和执行任务。此类有很多构造方法,如下所示。
- corePoolSize:核心线程数,这些线程即使空闲也会一直存活着,除非对核心线程设置了超时时间。
- maximumPoolSize:线程池最大线程数,即总的线程数,这其中包括了核心线程。
- keepAliveTime:当线程池的线程数量超过 corePoolSize 时,超过的非核心空闲线程在被销毁之前等待新任务的最大等待时间,即非核心线程的最大空闲时间。
- unit:参数 keepAliveTime 的时间单位。
- workQueue:任务阻塞队列,当核心线程都在执行任务时,新提交的任务就会被放到任务队列中。
- threadFactory:线程池创建新线程的线程工厂,一般使用默认的即可,除非需要定制化。
- handler:拒绝策略。所有线程都在执行任务,并且任务队列也满了,对新提交任务的处理方式。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 类中的一些变量如下所示。
// 高3位记录线程池状态,低29位记录线程池中线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 对于ctl变量,线程池数量占用比特位32-3=29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池中线程数最大值,00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 111 运行状态
private static final int RUNNING = -1 << COUNT_BITS;
// 000 关闭状态,不接受新任务,但是已提交的任务(队列中的任务,正在执行的任务)都会继续处理
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 停止状态,不接收新任务,不处理队列中的任务,中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 010 过渡状态,代表线程池即将进入终止状态
private static final int TIDYING = 2 << COUNT_BITS;
// 011 终止状态,即线程池真正被关闭了
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池中线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池执行流程
Executors 工具类
ExecutorService
接口提供一些操作线程池的方法,ThreadPoolExecutor 类是 ExecutorService 接口的实现类。Executors
相当于一个线程池工厂类,生产ExecutorService
实例(其实是 ThreadPoolExecutor 实例),它里面有几种现成的具备某种特定功能的线程池工厂方法,如下所示。
// 创建一个线程数量为10的固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
Executors工厂方法介绍:
- newSingleThreadExecutor():只有一个线程的线程池。超出的任务被放到任务队列,等这个线程空闲时就会去按顺序处理。
- newFixedThreadPool():固定线程数量线程池。传入的数字就是线程的数量,如果有空闲线程就去执行任务,如果没有空闲线程就会把任务放到一个任务队列,等到有线程空闲时再任务。
- newCachedThreadPool():可拓展的线程池。当没有空闲线程去执行新任务时,就会再创建新的线程去执行任务,执行完后新建的线程会返回线程池进行复用。
- newSingleThreadScheduledExecutor():返回 ScheduledExecutorService 对象。ScheduledExecutorService 接口继承 ExecutorService 接口,有一些拓展方法,如指定执行时间。这个线程池大小为1,在指定时间执行任务。关于指定时间的几个方法:schedule() 是在指定时间后执行一次任务。 scheduleAtFixedRate() 和 scheduleWithFixedDelay() 方法,两者都是周期性的执行任务,但是前者是以上一次任务开始为周期起点,后者是以上一次任务结束为周期起点。
- newScheduledThreadPool():和上面一个方法一样,但是可以指定线程池大小,其实上面那个方法也是调用这个方法的,只是传入的参数是1。
注意,阿里巴巴规范中不推荐使用 Executors 工具类来创建线程池,因为这种方式对线程池的控制粒度比较粗,创建线程池的大多参数都不是我们控制的。
- newFixedThreadPool:创建一个固定线程数量的线程池,阻塞队列使用的是无界任务队列。如果提交的任务量巨多的话会导致阻塞队列一直增长,占用大量内存空间,严重时会造成内存溢出。主要有以下几个问题。
- newCachedThreadPool:创建一个无上限线程数量的线程池,提交一个任务如果没有空闲线程,就创建一个线程来执行任务。如果提交的任务量巨多的话会导致创建太多的线程,会导致 CPU 100% 。
任务队列
线程池中的任务队列是一个BlockingQueue
接口,在 ThreadPoolExecutor 类中有如下几种实现类实现了 BlockingQueue 接口。
- LinkedBlockingQueue:无界任务队列,是个链表结构,不会出现任务队列满了的情况,除非内存空间不足,但是非常耗费系统资源。和有界任务队列一样,线程数若小于 corePoolSize ,新任务进来时没有空闲线程的话就会创建新线程,当达到 corePoolSize 时,就会进入任务队列。其实 maximumPoolSize 没什么作用,newFixedThreadPool 固定大小线程池就是用的这个任务队列,它的 corePoolSize 和 maximumPoolSize 相等。
- SynchronousQueue:直接提交队列。这种队列其实不会真正的去保存任务,每提交一个任务就直接让空闲线程执行,如果没有空闲线程就去新建,当达到最大线程数时,就会执行拒绝策略。所以使用这种任务队列时,一般会设置很大的maximumPoolSize,不然很容易就执行了拒绝策略。 newCachedThreadPool 线程池的 corePoolSize 为0,maximumPoolSize 无限大,它用的就是直接提交队列。
- ArrayBlockingQueue:有界任务队列,其构造函数必须带一个容量参数,表示任务队列的大小。当线程数量小于 corePoolSize 时,有任务进来优先创建线程。当线程数等于 corePoolSize 时,新任务就会进入任务队列,当任务队列满了,才会创建新线程,线程数达到 maximumPoolSize 时执行拒绝策略
- PriorityBlockingQueue:优先任务队列,它是一个特殊的无界队列,因为它总能保证高优先级的任务先执行。
拒绝策略
JDK 提供了四种拒绝策略,都实现了RejectedExecutionHandler
接口,如果这四种拒绝策略无法满足你的要求,可以自定义继承RejectedExecutionHandler 并实现 rejectedExecution 方法。
- AbortPolicy:直接抛出异常,阻止系统正常工作。JDK 默认是这种策略。
- CallerRunsPolicy:如果线程池未关闭,则在调用者线程里面执行被丢弃的任务,这个策略不是真正的拒绝任务。比如我们在 T1 线程中提交的任务,那么该拒绝策略就会把多余的任务放到 T1 线程执行,会影响到提交者线程的性能。
- DiscardOldestPolicy:该策略会丢弃一个最老的任务,也就是即将被执行的任务,然后再次尝试提交该任务。
- DiscardPolicy:直接丢弃当前提交的任务,不做任何处理,如果允许丢弃任务,这个策略是最好的。
线程工厂
线程池中的线程是由`ThreadFactory
负责创建的,一般情况下默认就行,如果有一些其他的需求,比如自定义线程的名称、优先级等,则可实现 ThreadFactory 接口来自定义自己的线程工厂。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
以下是 Executors 类中定义的默认线程工厂类。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
线程池扩展
ThreadPoolExecutor 类中有三个扩展方法:
- beforeExecute:在任务执行前执行。
- Execute:任务执行后执行。
- terminated:线程池退出时执行。
ThreadPoolExecutor 类中有一个内部类:`Worker
,每个线程的任务其实都是由这个类里面的 run 方法执行的。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// ...省略
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...省略
}
runWorker 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前执行该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行后执行该方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
还有一个线程池退出时执行的方法是在何处执行的?这个方法被调用的地方就不止一处了,像线程池的 shutdown 方法就会调用,例如 ThreadPoolExecutor 类的 shutdown 方法如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 线程池退出时执行
tryTerminate();
}
ThreadPoolExecutor 中这三个方法默认是没有任何内容的:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
我们也可以自定义并重写他们,例如继承 ThreadPoolExecutor 并重写这三个方法:
ExecutorService threadpool = new ThreadPoolExecutor(3, 10, 3L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 执行任务前
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 执行任务后
}
@Override
protected void terminated() {
// 线程退出
}
};
线程池工具类
最后给出一个自己封装,开箱即用的线程池工具类。
package com.chenpi;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 陈皮
* @version 1.0
* @description 线程池工具类
* @date 2020/06/02
*/
public class ThreadPoolUtils {
// 核心池大小
private static final int CORE_POOL_SIZE = 5;
// 线程池允许的最大线程数
private static final int MAXIMUM_POOL_SIZE = 10;
// 空闲的多余线程最大存活时间,单位秒
private static final int KEEP_ALIVE_TIME = 3;
// 任务阻塞队列大小
private static final int QUEUE_SIZE = 3;
// 默认线程池名称
private static final String DEFAULT_THREAD_POOL_NAME = "DEFAULT";
// 用于保存不同的线程池
private static final Map<String, ThreadPoolExecutor> executorList = new ConcurrentHashMap<>();
public static ThreadPoolExecutor getExecutor(String executorName) {
ThreadPoolExecutor executor = executorList.get(executorName);
if (executor == null) {
synchronized (ThreadPoolUtils.class) {
if (executor == null) {
executor = create(executorName);
}
}
}
return executor;
}
// 使用特定线程池
public static void execute(String executorName, Runnable command) {
getExecutor(executorName).execute(command);
}
// 使用默认线程池
public static void execute(Runnable command) {
getExecutor(DEFAULT_THREAD_POOL_NAME).execute(command);
}
// 使用特定线程池
public static <T> Future<T> submit(String executorName, Callable<T> command) {
return getExecutor(executorName).submit(command);
}
// 使用默认线程池
public static <T> Future<T> submit(Callable<T> command) {
return getExecutor(DEFAULT_THREAD_POOL_NAME).submit(command);
}
public static void shutdown(String executorName) {
if (null == executorName || executorName.length() == 0) {
executorName = DEFAULT_THREAD_POOL_NAME;
}
getExecutor(executorName).shutdown();
}
// 如果executorList中没有指定名称的线程池,则进行创建
private static ThreadPoolExecutor create(String executorName) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadPoolExecutor.CallerRunsPolicy());
executorList.put(executorName, executor);
return executor;
}
}
以下为调用示例。
package com.chenpi;
import java.util.concurrent.TimeUnit;
/**
* @author 陈皮
* @version 1.0
* @description
* @date 2022/3/14
*/
public class ChenPi {
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
final int fi = i;
ThreadPoolUtils.execute(() -> {
// 自定义当前线程未捕获异常处理
Thread.currentThread().setUncaughtExceptionHandler(
(t, e) -> System.out.println(t.getName() + ",Throwable:" + e));
try {
TimeUnit.SECONDS.sleep((long) (Math.random() * 5));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行完任务" + fi);
});
}
// 关闭线程池
ThreadPoolUtils.shutdown(null);
}
}
输出结果如下。
pool-1-thread-2 执行完任务1
pool-1-thread-10 执行完任务12
pool-1-thread-6 执行完任务8
pool-1-thread-10 执行完任务6
main 执行完任务13
pool-1-thread-3 执行完任务2
pool-1-thread-4 执行完任务3
pool-1-thread-10 执行完任务14
pool-1-thread-1 执行完任务0
pool-1-thread-8 执行完任务10
pool-1-thread-9 执行完任务11
pool-1-thread-7 执行完任务9
pool-1-thread-5 执行完任务4
pool-1-thread-2 执行完任务5
pool-1-thread-6 执行完任务7
pool-1-thread-3 执行完任务15
main 执行完任务17
pool-1-thread-4 执行完任务16
pool-1-thread-10 执行完任务18
pool-1-thread-1 执行完任务19
本次分享到此结束啦~~
如果觉得文章对你有帮助,点赞、收藏、关注、评论,您的支持就是我创作最大的动力!