线程池

简介: 线程池通过复用线程提升性能,避免频繁创建销毁的开销。Java中由Executor框架实现,核心为ThreadPoolExecutor,管理线程生命周期与任务调度。通过Executors工厂创建,支持提交异步任务、定时执行等。关键组件包括工作队列、线程工厂与拒绝策略,实现高效并发控制。(238字)

一、线程池初探
所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的关键在于它为我们管理了多个线程,我们不需要关心如何创建线程,我们只需要关系我们的核心业务,然后需要线程来执行任务的时候从线程池中获取线程。任务执行完之后线程不会被销毁,而是会被重新放到池子里面,等待机会去执行任务。
我们为什么需要线程池呢?首先一点是线程池为我们提高了一种简易的多线程编程方案,我们不需要投入太多的精力去管理多个线程,线程池会自动帮我们管理好,它知道什么时候该做什么事情,我们只要在需要的时候去获取就可以了。其次,我们使用线程池很大程度上归咎于创建和销毁线程的代价是非常昂贵的,甚至我们创建和销毁线程的资源要比我们实际执行的任务所花费的时间还要长,这显然是不科学也是不合理的,而且如果没有一个合理的管理者,可能会出现创建了过多的线程的情况,也就是在JVM中存活的线程过多,而存活着的线程也是需要销毁资源的,另外一点,过多的线程可能会造成线程过度切换的尴尬境地。
对线程池有了一个初步的认识之后,我们来看看如何使用线程池。

  1. 创建一个线程池

ExecutorService executorService = Executors.newFixedThreadPool(1);

  1. 提交任务

executorService.submit(() -> System.out.println("run"));
Future stringFuture = executorService.submit(() -> "run");

  1. 创建一个调度线程池

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  1. 提交一个周期性执行的任务

scheduledExecutorService
.scheduleAtFixedRate(() -> System.out.println("schedule"), 0, 1, TimeUnit.SECONDS);

  1. shutdown

executorService.shutdownNow();
scheduledExecutorService.shutdownNow();

可以发现使用线程池非常简单,只需要极少的代码就可以创建出我们需要的线程池,然后将我们的任务提交到线程池中去。我们只需要在结束之时记得关闭线程池就可以了。本文的重点并非在于如何使用线程池,而是试图剖析线程池的实现,比如一个调度线程池是怎么实现的?是靠什么实现的?为什么能这样实现等等问题。
二、Java线程池实现架构
Java中与线程池相关的类有下面一些:
● Executor
● ExecutorService
● ScheduledExecutorService
● ThreadPoolExecutor
● ScheduledThreadPoolExecutor
● Executors
通过上面一节中的使用示例,可以发现Executors类是一个创建线程池的有用的类,事实上,Executors类的角色也就是创建线程池,它是一个工厂类,可以产生不同类型的线程池,而Executor是线程池的鼻祖类,它有两个子类是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor则是真正的线程池,我们的任务将被这两个类交由其所管理者的线程池运行,可以发现,ScheduledThreadPoolExecutor是一个集大成者类,下面我们可以看看它的类关系图:

ScheduledThreadPoolExecutor的类关系图

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ThreadPoolExecutor实现了一般的线程池,没有调度功能,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的实现,然后增加了调度功能。
ScheduledThreadPoolExecutor相较于ThreadPoolExecutor增加了调度功能
最为原始的Executor只有一个方法execute,它接受一个Runnable类型的参数,意思是使用线程池来执行这个Runnable,可以发现Executor不提供有返回值的任务。ExecutorService继承了Executor,并且极大的增强了Executor的功能,不仅支持有返回值的任务执行,而且还有很多十分有用的方法来为你提供服务。

Executor不提供有返回值的任务,ExecutorService继承自Executor,支持有返回值的任务执行

下面展示了ExecutorService提供的方法:

ExecutorService提供的方法

ScheduledExecutorService继承了ExecutorService,并且增加了特有的调度(schedule)功能。关于Executor、ExecutorService和ScheduledExecutorService的关系,可以见下图:

Executor、ExecutorService和ScheduledExecutorService的关系
总结一下,经过我们的调研,可以发现其实对于我们编写多线程代码来说,最为核心的是Executors类,根据我们是需要ExecutorService类型的线程池还是ScheduledExecutorService类型的线程池调用相应的工厂方法就可以了,而ExecutorService的实现表现在ThreadPoolExecutor上,ScheduledExecutorService的实现则表现在ScheduledThreadPoolExecutor上,下文将分别剖析这两者,尝试弄清楚线程池的原理。
三、ThreadPoolExecutor解析
上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:

ThreadPoolExecutor的类图
下面是几个比较关键的类成员:

// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private final BlockingQueue workQueue;

//任务的执行值集合,来消费workQueue里面的任务
private final HashSet workers = new HashSet();

//线程工厂
private volatile ThreadFactory threadFactory;

//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
private volatile RejectedExecutionHandler handler;

1、CallerRunsPolicy:在调用者线程里面运行该任务
2、DiscardPolicy:丢弃任务
3、DiscardOldestPolicy:丢弃workQueue的头部任务

//最下保活work数量
private volatile int corePoolSize;

//work上限
private volatile int maximumPoolSize;

我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。

step 1:
Future<?> submit(Runnable task);

step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
        reject(command); //还不行?拒绝提交的任务
}

step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core) 


step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包装任务
final Thread t = w.thread; //获取线程(包含任务)
workers.add(w);   // 任务被放到works中
t.start(); //执行任务

上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。

worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:

final Thread thread;

Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面

thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:

public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
编译前 编译后 while (1); mov eax,1 test eax,eax je foo+23h jmp foo+18h
编译前 编译后 for (;;); jmp foo+23h   

对比之下,for (;;)指令少,不占用寄存器,而且没有判断跳转,比while (1)好。
也就是说两者在在宏观上完全一样的逻辑,但是底层完全不一样,for相对于来说更加简洁明了

相关文章
|
2天前
|
存储 监控 Java
整合切面,参数拦截+过滤
该类基于Spring AOP实现请求参数日志记录,通过`@Before`、`@Around`和`@After`切面拦截Controller层方法,自动记录请求来源、URL、方式、参数及执行耗时,便于调试与监控,日志通过LogProxy输出,提升系统可观测性。(238字)
|
1天前
|
XML SQL 监控
整合Logback,滚动记录+多文件
`logback-spring.xml` 配置了多模块日志分离输出,按类别将支付、任务、SQL、错误等日志写入不同文件,支持滚动策略与UTF-8编码。通过 `LogProxy.getLogger(&quot;LOG_NAME&quot;)` 获取指定日志器,实现精准日志记录,便于问题追踪与系统监控。(236字符)
|
1天前
|
JSON Java Maven
SpringBoot使用汇总
Spring Boot是Spring框架的延伸,旨在简化Spring应用的初始搭建与开发过程。它通过自动配置、内嵌服务器、开箱即用的依赖等方式,极大减少了项目配置和编码量,提升开发效率。支持快速构建微服务,是Java EE开发的主流趋势。
|
1天前
|
SQL Java 关系型数据库
分页
本文介绍了五种分页实现方式:MyBatis自带RowBounds内存分页、PageHelper插件分页、SQL原生分页、数组分页及拦截器分页。对比了逻辑分页(内存处理)与物理分页(数据库层处理)的优劣,指出大数据量下应优先选用物理分页以避免内存溢出,提升性能。
|
1天前
|
XML JSON Java
映射关系(1-1 1-n n-n)
MyBatis中通过resultMap实现关联映射:一对一使用resultMap解决字段与属性名不一致;一对多在“一”方配置&lt;collection&gt;,如用户包含多个角色;多对一通过&lt;association&gt;关联,如博客关联作者;多对多借助中间类,双方均用&lt;collection&gt;维护集合关系。
|
1天前
|
存储 NoSQL Linux
MongoDB单机部署
提供Win32/64位MongoDB安装包,支持命令行或配置文件启动,Linux与Windows系统均可部署。建议选择y为偶数的稳定版本,通过官网下载并解压,配置data目录及mongod.conf,使用mongod启动服务,mongo命令连接。可选Compass图形化工具管理数据库。注意端口、路径格式与防火墙设置。
|
2天前
|
存储 安全 Java
Java泛型类型擦除以及类型擦除带来的问题
Java泛型在编译时会进行类型擦除,所有泛型信息被移除,替换为原始类型(如Object或限定类型)。例如,List&lt;String&gt;和List&lt;Integer&gt;在运行时均为List,导致无法通过instanceof判断泛型类型。类型检查在编译期完成,基于引用而非实际对象。擦除后,编译器自动插入强制转换保证类型安全。但这也引发多态冲突、静态成员限制等问题,需通过桥方法等机制解决。基本类型不能作为泛型参数,静态上下文中也不能使用类级别泛型参数。
|
1天前
|
Java
常见加载顺序
本示例展示了Java中各类代码块的执行顺序:静态代码块随类加载仅执行一次,优先于主函数;局部代码块在方法内直接运行;构造代码块每次创建对象前自动执行,早于构造器。输出结果体现三者优先级:静态 &gt; 局部 &gt; 构造。
|
2天前
|
Java 大数据
ArrayList扩容机制
ArrayList添加元素时,先调用ensureCapacityInternal()确保容量,首次添加时默认扩容至10。每次扩容通过grow()实现,新容量为原容量的1.5倍(oldCapacity + (oldCapacity &gt;&gt; 1)),提升性能。add第11个元素时再次触发扩容。length为数组属性,length()是字符串方法,size()用于集合获取元素数。
|
1天前
|
自然语言处理 fastjson Java
大面积故障规避案例
本文记录了一次由Kotlin语法误用引发的FastJson反序列化全局异常问题。因混编环境下将`{}`误赋值给Java对象字段,导致FastJson解析时触发静态标记位`kotlin_error`,进而使整个应用反序列化失效。排查耗时两天,揭示了多语言混编、框架兼容性及静态状态风险等深层问题,值得开发者警惕。