从实战到原理,线程池的各类使用场景整合(中)

简介: 从实战到原理,线程池的各类使用场景整合(中)

线程池内部的源代码分析


我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池,例如:


@Bean
public ExecutorService emailTaskPool() {
    return new ThreadPoolExecutor(2, 4,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}


ThreadPoolExecutor的父类是AbstractExecutorService,然后AbstractExecutorService的顶层接口是:ExecutorService。


就例如发送邮件接口而言,当线程池触发了submit函数的时候,实际上会调用到父类AbstractExecutorService对象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)方法,然后进入到ThreadPoolExecutor#execute部分。


@Override
public void sendEmail(EmailDTO emailDTO) {
    emailTaskPool.submit(() -> {
        try {
            System.out.printf("sending email .... emailDto is %s \n", emailDTO);
            Thread.sleep(1000);
            System.out.println("sended success");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}


java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:


/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。因此如果有时候如果需要判断线程池执行任务的结果话,可以这样操作:


Future future = emailTaskPool.submit(() -> {
          try {
              System.out.printf("sending email .... emailDto is %s \n", emailDTO);
              Thread.sleep(1000);
              System.out.println("sended success");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      });
      //todo something
      future.get();
}


在jdk8源代码中,提交任务的执行逻辑部分如下所示:新增线程任务的时候代码:


public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }


通过深入阅读工作线程主要存放在了一个hashset集合当中, 添加工作线程部分的逻辑代

码如下所示:


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //确保当前线程池没有进入到一个销毁状态中
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
              // 如果传入的core属性是false,则这里需要比对maximumPoolSize参数
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                //通过cas操作去增加线程池的工作线程数亩
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       //真正需要指定的任务是firstTask,它会被注入到worker对象当中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
        //加入了锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashset集合,会往里面新增工作线程    
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //worker本身是一个线程,但是worker对象内部还有一个线程的参数,
                //这个t才是真正的任务内容
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果worker线程创建好了,但是内部的真正任务还没有启动,此时突然整个
        //线程池的状态被关闭了,那么这时候workerStarted就会为false,然后将
        //工作线程的数目做自减调整。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


进过理解之后,整体执行的逻辑以及先后顺序如下图所示:


image.png


首先判断线程池内部的现场是否都有任务需要执行。如果不是,则使用一个空闲的工作线程用于任务执行。否则会判断当前的堵塞队列是否已经满了,如果没有满则往队列里面投递任务,等待工作线程去处理。


如果堵塞队列已经满了,此时会判断工作线程数是否大于最大线程数,如果没有,则继续创建工作线程,如果已经达到则根据饱和策略去判断是果断抛出异常还是其他方式来进行处理。

相关文章
|
1月前
|
存储 缓存 监控
什么是线程池?它的工作原理?
我是小假 期待与你的下一次相遇 ~
180 1
|
1月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
196 1
|
5月前
|
存储 SQL 安全
Java 无锁方式实现高性能线程实战操作指南
本文深入探讨了现代高并发Java应用中单例模式的实现方式,分析了传统单例(如DCL)的局限性,并提出了多种无锁实现方案。包括基于ThreadLocal的延迟初始化、VarHandle原子操作、Record不可变对象、响应式编程(Reactor)以及CDI依赖注入等实现方式。每种方案均附有代码示例及适用场景,同时通过JMH性能测试对比各实现的优劣。最后,结合实际案例设计了一个高性能配置中心,展示了无锁单例在实际开发中的应用。总结中提出根据场景选择合适的实现方式,并遵循现代单例设计原则以优化性能和安全性。文中还提供了代码获取链接,便于读者实践与学习。
116 0
|
1月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
248 0
|
6月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
445 0
|
3月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
268 1
|
5月前
|
算法 Java 测试技术
深度优化OSS上传性能:多线程分片上传 vs 断点续传实战对比
本文深入解析对象存储服务(OSS)文件上传性能优化技术,重点探讨多线程分片上传与断点续传两种方案。通过理论分析、代码实现和性能测试,对比其在不同场景下的表现差异,并提供选型建议与最佳实践,助力提升大文件上传效率与稳定性。
533 0
|
5月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
256 0

热门文章

最新文章