Java由浅入深理解线程池设计和原理2

简介: Java由浅入深理解线程池设计和原理2

5.3 掌握线程池个参数定义

/**
 * @param corePoolSize 池中要保留的线程数
 * @param maximumPoolSize 中允许的最大线程数,前提是队列先满
 * @param keepAliveTime 当线程数大于核心,这是多余空闲线程的最长时间将在终止之前等待新任务。
 * @param keepAliveTime参数的时间单位
 * @param workQueue 用于保存任务的队列
 * @param threadFactory 执行器创建新线程的工厂
 * @param handler 阻止执行时要使用的处理程序,因为达到了线程边界和队列容量
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

5.4 线程池结构说明

在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。

(源码查看:两个集合,一个queue,一个hashset)

5.5 线程池的任务提交

  • 添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行
  • 达到core,放入queue
  • queue已满,未达到maxSize继续创建线程
  • 达到maxSize,根据reject策略处理
  • 超时后,线程被释放,下降到coreSize

5.6 线程池工具类Executors

  • newCachedThreadPool() : 弹性线程数
  • newFixedThreadPool(int nThreads) : 固定线程数
  • newSingleThreadExecutor() : 单一线程数
  • newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时

5.7 确定线程池的线程数

虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。

按照任务类型对线程池进行分类:

(1)IO密集型任务

此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写 操作为此类任务的典型例子。

(2)CPU密集型任务

此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU的利用率很高。

(3)混合型任务

此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往 返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。一般情况 下,针对以上不同类型的异步任务需要创建不同类型的线程池,并进行针对性的参数配置。

5.7.1 为IO密集型任务确定线程数

由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用 其他线程继续使用CPU,以提高CPU的使用率。Netty的IO处理任务就是典型的IO密集型任务。所以,Netty的Reactor(反应器)实 现类(定制版的线程池)的IO处理线程数默认正好为CPU核数的两倍

5.7.2 为CPU密集型任务确定线程数

CPU密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗CPU资源,比如计算圆周率、对视频进行高清解码 等。CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要 最高效地利用CPU,CPU密集型任务并行执行的数量应当等于CPU的核心数。

比如4个核心的CPU,通过4个线程并行地执行4个CPU密集型任务,此时的效率是最高的。但是如果线程数远远超出CPU核 心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。因此,对于CPU密集型的任务来说,线 程数等于CPU数就行。

5.7.3 为混合型任务确定线程数

混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的数倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间。在为混合型任务创建线程池时,如何确定线程数呢?业界有一个比较成熟的估算公式,具体如下:

最佳线程数 = ((线程等待时间+线程CPU时间) / 线程CPU时间) * CPU核数

通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。下面举一个例子:比如在Web服务器处理HTTP请求时,假设平均线程CPU运行时间为100毫秒,而线程等待时间(比如包括DB操作、RPC操作、缓存操作等)为900毫秒,如果CPU核数为8,那么根据上面这个公式,估算如下:

(900毫秒 + 100毫秒) / 100毫秒 * 8 = 10 * 8 = 80

5.8 线程池源码刨析

//任务提交阶段:(4个if条件路线)
public void execute(Runnable command) {
  if (command == null)
            throw new NullPointerException();
  int c = ctl.get();
  //判断工作数,如果小于coreSize,addWork,注意第二个参数core=true
  if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
          return;
      c = ctl.get();
  }
  //否则,如果线程池还在运行,offer到队列
  if (isRunning(c) && workQueue.offer(command)) {
      //再检查一下状态
      int recheck = ctl.get();
      //如果线程池已经终止,直接移除任务,不再响应
      if (! isRunning(recheck) && remove(command))
          reject(command);
      //否则,如果没有可用线程的话(比如coreSize=0),创建一个空work
        //该work创建时不会给指派任务(为null),但是会被放入works集合,进而从队列获取任务去执行
      else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
  }
  //队列也满,继续调addWork,但是注意,core=false,开启到maxSize的大门
  //超出max的话,addWork会返回false,进入reject
  else if (!addWorker(command, false))
      reject(command);
}
//线程创建
private boolean addWorker(Runnable firstTask, boolean core) {
    //第一步,计数判断,不符合条件打回false
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        for (;;) {
            int wc = workerCountOf(c);
            //判断线程数,注意这里!
            //也就说明线程池的线程数是不可能设置任意大的。
            //最大29位(CAPACITY=29位二进制)
            //超出规定范围,返回false,表示不允许再开启新工作线程,创建worker失败!
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //第二步,创建新work放入线程集合works(一个HashSet)
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //符合条件,创建新的work并包装task
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //加锁,workers是一个hashset,这里要保障线程安全性
            mainLock.lock();
            try {   
                        //...
                    //在这里!!!
                    workers.add(w);
                    //...                    
                    workerAdded = true;
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //注意,只要是成功add了新的work,那么将该新work立即启动,任务得到执行
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
//任务获取与执行
//在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行
while (task != null || (task = getTask()) != null)
//如果没用,会调用getTask,从队列获取任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // ...
        int wc = workerCountOf(c);
        // Are workers subject to culling? - 很形象,要不要乖乖的被“捕杀”?
        //判断是不是要超时处理,重点!!!决定了当前线程要不要被释放
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                //线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止
        //将线程队列数量原子性减
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //计数器做原子递减,递减成功后,返回null,for被中止
            if (compareAndDecrementWorkerCount(c))
                return null;
            //递减失败,继续下一轮循环,直到成功
            continue;
        }
        try {
            //重点!!!
            //如果线程可被释放,那就poll,释放的时间为:keepAliveTime
            //否则,线程是不会被释放的,take一直被阻塞在这里,直到来了新任务继续工作
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

5.9 Executors

以上构造函数比较多,为了方便使用,juc提供了一个Executors工具类,内部提供静态方法

1)newCachedThreadPool() : 弹性线程数

2)newFixedThreadPool(int nThreads) : 固定线程数

3)newSingleThreadExecutor() : 单一线程数

4)newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时

6 线程池的经典面试题

6.1 线程池是如何保证线程不被销毁的呢?

答案:如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮 work.runWork()中循环

验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()

//work.runWork():
while (task != null || (task = getTask()) != null)
//work.getTask():
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();

6.2 核心线程与非核心线程有区别吗?

答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁

验证:看源码,每个work在runWork()的时候去getTask(),在getTask内部,并没有针对性的区分当前work是否是核心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take()

6.3 线程池7个参数的作用及生效时机

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}


目录
相关文章
|
14天前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
Java 数据库 Spring
49 0
|
27天前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
61 16
|
2月前
|
缓存 并行计算 安全
关于Java多线程详解
本文深入讲解Java多线程编程,涵盖基础概念、线程创建与管理、同步机制、并发工具类、线程池、线程安全集合、实战案例及常见问题解决方案,助你掌握高性能并发编程技巧,应对多线程开发中的挑战。
|
2月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
3月前
|
监控 Java API
现代 Java IO 高性能实践从原理到落地的高效实现路径与实战指南
本文深入解析现代Java高性能IO实践,涵盖异步非阻塞IO、操作系统优化、大文件处理、响应式网络编程与数据库访问,结合Netty、Reactor等技术落地高并发应用,助力构建高效可扩展的IO系统。
90 0
|
3月前
|
存储 缓存 安全
深入讲解 Java 并发编程核心原理与应用案例
本教程全面讲解Java并发编程,涵盖并发基础、线程安全、同步机制、并发工具类、线程池及实际应用案例,助你掌握多线程开发核心技术,提升程序性能与响应能力。
116 0
|
3月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
303 83
|
3月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
140 0