开发者社区> yanglbme> 正文

手撕源码!线程池核心组件源码剖析

简介: 看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。
+关注继续查看

本文选自 Doocs 开源社区旗下“源码猎人”项目,作者 AmyliaY。


项目将会持续更新,欢迎 Star 关注。


项目地址:https://github.com/doocs/source-code-hunter


线程池核心组件图解


看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。


32.png


该组件中,Executor 和 ExecutorService 接口 定义了线程池最核心的几个方法,提交任务 submit ()、关闭线程池 shutdown()。抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor 接口 的方法。实现类 ThreadPoolExecutor 则对线程池进行了具体而复杂的实现。


另外还有一个常见的工具类 Executors,里面为开发者封装了一些可以直接拿来用的线程池。


源码赏析


话不多说,直接上源码。(这里只看最主要的代码部分)


Executor 和 ExecutorService 接口


public interface Executor {    /**     * 在将来的某个时间执行给定的 Runnable。该 Runnable 可以在新线程、池线程或调用线程中执行。     */    void execute(Runnable command);}public interface ExecutorService extends Executor {    /**     * 优雅关闭,该关闭会继续执行完以前提交的任务,但不再接受新任务。     */    void shutdown();    /**     * 提交一个有返回值的任务,并返回该任务的 未来执行完成后的结果。     * Future的 get()方法 将在成功完成后返回任务的结果。     */    <T> Future<T> submit(Callable<T> task);    <T> Future<T> submit(Runnable task, T result);    Future<?> submit(Runnable task);}


AbstractExecutorService 抽象类


/** * 该抽象类最主要的内容就是,实现了 ExecutorService 中的 submit()系列方法 */public abstract class AbstractExecutorService implements ExecutorService {    /**     * 提交任务 进行执行,返回获取未来结果的 Future对象。     * 这里使用了 “模板方法模式”,execute()方法来自 Executor接口,该抽象类中并未进行实现,     * 而是交由子类具体实现。     */    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Runnable task, T result) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task, result);        execute(ftask);        return ftask;    }    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }}


ThreadPoolExecutor


public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * **************     * ** 主要属性 **     * **************     */    /** 阻塞队列 */    private final BlockingQueue<Runnable> workQueue;    /** 用于创建线程的 线程工厂 */    private volatile ThreadFactory threadFactory;    /** 核心线程数 */    private volatile int corePoolSize;    /** 最大线程数 */    private volatile int maximumPoolSize;    /**     * **************     * ** 构造方法 **     * **************     */    /** 最后都使用了最后一个构造方法的实现 */    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             threadFactory, defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }    /**     * **************     * ** 主要实现 **     * **************     */    /** 执行 Runnable任务 */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * 分三步进行:         *         * 1、如果运行的线程少于 corePoolSize,尝试开启一个新的线程;否则尝试进入工作队列         *         * 2. 如果工作队列没满,则进入工作队列;否则 判断是否超出最大线程数         *         * 3. 如果未超出最大线程数,则尝试开启一个新的线程;否则 按饱和策略处理无法执行的任务         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }    /**     * 优雅关闭,在其中执行以前提交的任务,但不接受新任务。如果已关闭,则调用没有其他效果。     */    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();    }}


ThreadPoolExecutor 中的 execute()方法 执行 Runnable 任务 的流程逻辑可以用下图表示。


33.png


工具类 Executors


看类名也知道,它最主要的作用就是提供 static 的工具方法,为开发者提供各种封装好的 具有各自特性的线程池。


public class Executors {    /**     * 创建一个固定线程数量的线程池     */    public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }    /**     * 创建一个单线程的线程池     */    public static ExecutorService newSingleThreadExecutor() {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>()));    }    /**     * 创建一个缓存的,可动态伸缩的线程池。     * 可以看出来:核心线程数为0,最大线程数为Integer.MAX_VALUE,如果任务数在某一瞬间暴涨,     * 这个线程池很可能会把 服务器撑爆。     * 另外需要注意的是,它们底层都是使用了 ThreadPoolExecutor,只不过帮我们配好了参数     */    public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }}


全文完!


希望本文对大家有所帮助。如果感觉本文有帮助,有劳转发或点一下“在看”!让更多人收获知识!


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
如何设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云安全组设置详细图文教程(收藏起来) 阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程。阿里云会要求客户设置安全组,如果不设置,阿里云会指定默认的安全组。那么,这个安全组是什么呢?顾名思义,就是为了服务器安全设置的。安全组其实就是一个虚拟的防火墙,可以让用户从端口、IP的维度来筛选对应服务器的访问者,从而形成一个云上的安全域。
19821 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
29203 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
22541 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
20715 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
16479 0
腾讯云服务器 设置ngxin + fastdfs +tomcat 开机自启动
在tomcat中新建一个可以启动的 .sh 脚本文件 /usr/local/tomcat7/bin/ export JAVA_HOME=/usr/local/java/jdk7 export PATH=$JAVA_HOME/bin/:$PATH export CLASSPATH=.
14904 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
23588 0
+关注
73
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载