Java 多线程:线程池实现原理

简介:

前言

我们都知道,所谓线程池,那么就是相当于有一个池子,线程就放在这个池子中进行重复利用,能够减去了线程的创建和销毁所带来的代价。但是这样并不能很好的解释线程池的原理,下面从代码的角度分析一下线程池的实现。

线程池的相关类

对于原理,在 Java 中,有几个接口,类 值得我们关注:

  • Executor

  • ExecutorService

  • AbstractExecutorService

  • ThreadPoolExecutor

Executor

public interface Executor {
    void execute(Runnable command);
}


Executor 接口只有一个 方法,execute,并且需要 传入一个 Runnable 类型的参数。 那么它的作用自然是 具体的执行参数传入的任务。

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    ......
}


ExecutorService 接口继承了 Executor,并且提供了一些其他的方法,比如说:

  • shutdownNow : 关闭线程池,返回放入了线程池,但是还没开始执行的线程。

  • submit : 执行的任务 允许拥有返回值。

  • invokeAll : 运行把任务放进集合中,进行批量的执行,并且能有返回值

这三个方法也可以说是这个接口重点扩展的方法。

Ps:execute 和 submit 区别:

  • submit 有返回值,execute 没有返回值。 所以说可以根据任务有无返回值选择对应的方法。

  • submit 方便异常的处理。 如果任务可能会抛出异常,而且希望外面的调用者能够感知这些异常,那么就需要调用 submit 方法,通过捕获 Future.get 抛出的异常。

AbstractExecutorService

AbstractExecutorService 是一个抽象类, 主要完成了 对 submit 方法,invokeAll 方法 的实现。 但是其实它的内部还是调用了 execute 方法,例如:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }


ThreadPoolExecutor

ThreadPoolExecutor 继承了 AbstractExecutorService, 并且实现了最重要的 execute 方法 ,是我们主要需要研究的类。另外,整个线程池是如何实现的呢?

在该类中,有两个成员变量 非常的重要:

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;


对于 workers 变量,主要存在了线程对象 Worker,Worker 实现了 Runnable 接口。而对于 workQueue 变量,主要存放了需要执行的任务。 这样其实可以猜到,  整个线程池的实现原理应该是 workQueue 中不断的取出需要执行的任务,放在 workers 中进行处理。  

另外,当线程池中的线程用完了之后,多余的任务会等待,那么这个等待的过程是 怎么实现的呢? 其实如果熟悉 BlockingQueue,那么就会马上知道, 是利用了 BlockingQueue 的take 方法进行处理 

下面具体代码分析:

public void execute(Runnable command) {
        ......
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        ......
    }


首先,这里需要先理解两个概念。我们在创建线程池的时候,通常会指定两个变量,一个是maximumPoolSize,另外一个是 corePoolSize。

  • 对于 maximumPoolSize:指的是 线程池中最多允许有多少个线程。

  • 对于 corePoolSize: 指的是线程池中正在运行的线程。

在 线程池中,有这样的设定,我们加入一个任务进行执行,

  • 如果现在线程池中正在运行的线程数量大于 corePoolSize 指定的值而 小于maximumPoolSize 指定的值,那么就会创建一个线程对该任务进行执行,一旦一个线程被创建运行。

  • 如果线程池中的线程数量大于corePoolSize,那么这个任务执行完毕后,该线程会被回收;如果 小于corePoolSize,那么该线程即使空闲,也不会被回收。下个任务过来,那么就使用这个空闲线程。

对于上述代码,首先有:

if (workerCountOf(c) &lt; corePoolSize)

也就是说,判断现在的线程数量是否小于corePoolSize,如果小于,那么就创建一个线程执行该任务,也就是执行

addWorker(command, true)

如果大于,那么就把该任务放进队列当中,即

workQueue.offer(command)

那么,addWorker 是干什么的呢?

private boolean addWorker(Runnable firstTask, boolean core) {    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
             ......
    }


在这里可以看到一些关键代码,例如 w = new Worker(firstTask), 以及 workers.add(w); 从这里 我们就可以看到,创建 线程对象 并且加入到 线程 队列中。但是,我们现在还没有看到具体是怎么执行任务的,继续追踪 w = new Worker(firstTask),如下代码:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        ......

        final Thread thread;

        Runnable firstTask;
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        ......


对于 runWorker 方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } 
                ......
    }


在这段代码中,就有很多关键的信息,比如说,Runnable task = w.firstTask;如果为空,那么就 执行 task = getTask(),如果不为空,那么就 进行 task.run(); 调用其方法, 这里也就是具体的执行的任务 

现在知道了是怎么样执行具体的任务,那么假如任务的数量 大于 线程池的数量,那么是怎么实现等待的呢,这里就需要看到getTask() 的具体实现了,如下:

private Runnable getTask() {
        for (;;) {
           ......
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


这里可以看到, 一个 for 死循环,以及

Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

而 workQueue 是 BlockingQueue 类型,也就是带有阻塞的功能。

这就是 线程如何等待执行的。

总结

现在就可以知道,大致的线程池实现原理:

首先,各自存放线程和任务,其中,任务带有阻塞。

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;


然后,在 execute 方法中 进行 addWorker(command,true),也就是创建一个线程,把任务放进去执行;或者是直接把任务放入到任务队列中。

接着 如果是 addWorker,那么就会 new Worker(task) -》调用其中 run() 方法,在Worker 的run() 方法中,调用 runWorker(this); 方法 -》在该方法中就会具体执行我们的任务 task.run(); 同时这个 runWorker方法相当于是个死循环,正常情况下就会一直取出 任务队列中的任务来执行,这就保证了线程 不会销毁。

所以,这也是为什么常说的线程池可以避免线程的频繁创建和 销毁带来的性能消耗。

写在最后

  1. 写出来,说出来才知道对不对,知道不对才能改正,改正了才能成长。

  2. 在技术方面,希望大家眼里都容不得沙子。如果有不对的地方或者需要改进的地方希望可以指出,万分感谢。

本文转自 www19 51CTO博客,原文链接:http://blog.51cto.com/doujh/1825327,如需转载请自行联系原作者
相关文章
|
9月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
384 0
|
11月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
403 0
|
6月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
377 2
|
9月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
10月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
702 5
|
10月前
|
监控 搜索推荐 Java
Java 多线程最新实操技术与应用场景全解析:从基础到进阶
本文深入探讨了Java多线程的现代并发编程技术,涵盖Java 8+新特性,如CompletableFuture异步处理、Stream并行流操作,以及Reactive编程中的Reactor框架。通过具体代码示例,讲解了异步任务组合、并行流优化及响应式编程的核心概念(Flux与Mono)。同时对比了同步、CompletableFuture和Reactor三种实现方式的性能,并总结了最佳实践,帮助开发者构建高效、扩展性强的应用。资源地址:[点击下载](https://pan.quark.cn/s/14fcf913bae6)。
531 3
|
11月前
|
算法 Java 调度
Java多线程基础
本文主要讲解多线程相关知识,分为两部分。第一部分涵盖多线程概念(并发与并行、进程与线程)、Java程序运行原理(JVM启动多线程特性)、实现多线程的两种方式(继承Thread类与实现Runnable接口)及其区别。第二部分涉及线程同步(同步锁的应用场景与代码示例)及线程间通信(wait()与notify()方法的使用)。通过多个Demo代码实例,深入浅出地解析多线程的核心知识点,帮助读者掌握其实现与应用技巧。
177 1
|
11月前
|
Java
java 多线程异常处理
本文介绍了Java中ThreadGroup的异常处理机制,重点讲解UncaughtExceptionHandler的使用。通过示例代码展示了当线程的run()方法抛出未捕获异常时,JVM如何依次查找并调用线程的异常处理器、线程组的uncaughtException方法或默认异常处理器。文章还提供了具体代码和输出结果,帮助理解不同处理器的优先级与执行逻辑。
233 1
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
393 155
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
657 1
下一篇
开通oss服务