java线程池使用(二)------部分源码解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 简单介绍java线程池源码

前一篇博客介绍了构造参数分别是什么意思
现在介绍java线程池源码
1 init
线程池初始化,代码比较简单

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || //核心线程数校验
            maximumPoolSize <= 0 || //非核心线程数校验
            maximumPoolSize < corePoolSize || 
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null) //必传参数校验
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

2 线程池核心类Worker
线城池中,使用Worker来保证线程池线程的活跃,保证在没有任务执行时,也能保证线程资源不被释放掉
woker类实现了runable接口,本身就是一个线程

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        //worker运行的线程,是线程池中实际的线程,通过ThreadFactory生成
        final Thread thread;
       //线程池新增线程时的默认任务,可以为空
        Runnable firstTask;
        //已经执行过的线程任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //将执行方法委托给ThreadPoolExecuter
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

3 线程池新增任务
image

下面看源码
核心方法1 ThreadPoolExecuter.execute()


  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
         
        int c = ctl.get();
        //判断当前线程数量和最大核心线程数
        if (workerCountOf(c) < corePoolSize) {//线程数<最大核心线程数
            if (addWorker(command, true))//尝试创建新的worker,如果创建成功则直接返回
                return;
            c = ctl.get();//获取最新的的数量
        }
        if (isRunning(c) && workQueue.offer(command)) {//1 检查线程池状态
                                                        //2当前线程数>=最大核心线程数。
                                                        //3尝试将新增的线程放入blockingqueue
            int recheck = ctl.get(); //对线程池状态进行二次校验,防止其他线程在offer期间对线程池状态进行了修改
            if (! isRunning(recheck) && remove(command)) //二次校验,失败后将新增线程从blockingqueue移除
                                                         //二次校验成功则不移除,利用java&&的特性
                reject(command);                        //校验失败进入过载策略
            else if (workerCountOf(recheck) == 0) //二次校验成功,判断当前线程池线程数量是否归零,
                                                   //如果归零,则创建新的空worker,由worker去进行处理queue
                                                    //中的任务
               addWorker(null, false);
        }
        else if (!addWorker(command, false))//如果queue.offer返回false,证明queue已经满了,
                                           // 尝试创建非核心线程
            reject(command);  //创建失败则证明已经到达最大线程数,进去过载策略
    }

2 核心方法二 ThreadPoolExecuter.addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取当前线程池状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
相关文章
|
14天前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
12天前
|
Kubernetes jenkins 持续交付
从代码到k8s部署应有尽有系列-java源码之String详解
本文详细介绍了一个基于 `gitlab + jenkins + harbor + k8s` 的自动化部署环境搭建流程。其中,`gitlab` 用于代码托管和 CI,`jenkins` 负责 CD 发布,`harbor` 作为镜像仓库,而 `k8s` 则用于运行服务。文章具体介绍了每项工具的部署步骤,并提供了详细的配置信息和示例代码。此外,还特别指出中间件(如 MySQL、Redis 等)应部署在 K8s 之外,以确保服务稳定性和独立性。通过本文,读者可以学习如何在本地环境中搭建一套完整的自动化部署系统。
39 0
|
2天前
|
开发工具
Flutter-AnimatedWidget组件源码解析
Flutter-AnimatedWidget组件源码解析
|
13天前
|
Java API
Java 8新特性:Lambda表达式与Stream API的深度解析
【7月更文挑战第61天】本文将深入探讨Java 8中的两个重要特性:Lambda表达式和Stream API。我们将首先介绍Lambda表达式的基本概念和语法,然后详细解析Stream API的使用和优势。最后,我们将通过实例代码演示如何结合使用Lambda表达式和Stream API,以提高Java编程的效率和可读性。
|
14天前
|
Java 开发者 UED
“Java开发者必看:异步编程实战解析,掌握这些技巧,让你的代码跑得更快!
【8月更文挑战第30天】随着互联网技术的发展,系统性能和用户体验成为关注焦点。异步编程作为提高应用响应速度和吞吐量的技术,在Java中广泛采用。本文详细介绍了Java异步编程的概念与优势,并通过实战示例展示了如何利用Future、Callable及CompletableFuture在实际项目中实施异步编程,帮助开发者更好地理解和应用这一技术。
29 2
|
14天前
|
存储 算法 Java
Java中的集合框架深度解析云上守护:云计算与网络安全的协同进化
【8月更文挑战第29天】在Java的世界中,集合框架是数据结构的代言人。它不仅让数据存储变得优雅而高效,还为程序员提供了一套丰富的工具箱。本文将带你深入理解集合框架的设计哲学,探索其背后的原理,并分享一些实用的使用技巧。无论你是初学者还是资深开发者,这篇文章都将为你打开一扇通往高效编程的大门。
|
安全 Java
Java并发编程笔记之CopyOnWriteArrayList源码分析
并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。
19538 0
|
Java 安全
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析
我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。
3114 0
|
Java
Java并发编程笔记之FutureTask源码分析
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
4280 0
|
Java 调度 API
Java并发编程笔记之Timer源码分析
timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢?   Timer只创建唯一的线程来执行所有Timer任务。
2989 0

热门文章

最新文章

推荐镜像

更多