肝完这篇线程池,我咳血了(二)

简介: 我们知道,线程需要的时候要进行创建,不需要的时候需要进行销毁,但是线程的创建和销毁都是一个开销比较大的操作。

AbstractExecutorService 抽象类

AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 中的部分方法,它相当一个干将,会分析大管家有哪些要做的工作,然后针对大管家的要求做一些具体的规划,然后找他的得力助手 ThreadPoolExecutor 来完成目标。

AbstractExecutorService 这个抽象类主要实现了 invokeAllinvokeAny 方法,关于这两个方法的源码分析我们会在后面进行解释。

ScheduledExecutorService 接口

ScheduledExecutorService 也是一个接口,它扩展了 ExecutorService 接口,提供了 ExecutorService 接口所没有的功能,ScheduledExecutorService 顾名思义就是一个定时执行器,定时执行器可以安排命令在一定延迟时间后运行或者定期执行。

它主要有三个接口方法,一个重载方法。下面我们先来看一下这两个重载方法。

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

schedule 方法能够延迟一定时间后执行任务,并且只能执行一次。可以看到,schedule 方法也返回了一个 ScheduledFuture 对象,ScheduledFuture 对象扩展了 Future 和 Delayed 接口,它表示异步延迟计算的结果。schedule 方法支持零延迟和负延迟,这两类值都被视为立即执行任务。

还有一点需要说明的是,schedule 方法能够接收相对的时间和周期作为参数,而不是固定的日期,你可以使用 date.getTime - System.currentTimeMillis() 来得到相对的时间间隔。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

scheduleAtFixedRate 表示任务会根据固定的速率在时间 initialDelay 后不断地执行。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

这个方法和上面的方法很类似,它表示的是以固定延迟时间的方式来执行任务。

scheduleAtFixedRate 和 scheduleWithFixedDelay 这两个方法容易混淆,下面我们通过一个示例来说明一下这两个方法的区别。

public class ScheduleTest {
    public static void main(String[] args) {
        Runnable command = () -> {
            long startTime = System.currentTimeMillis();
            System.out.println("current timestamp = " + startTime);
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("time spend = " + (System.currentTimeMillis() - startTime));
        };
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleAtFixedRate(command,100,1000,TimeUnit.MILLISECONDS);
    }
}

输出结果大致如下

微信图片_20220416151638.png

可以看到,每次打印出来 current timestamp 的时间间隔大约等于 1000 毫秒,所以可以断定 scheduleAtFixedRate 是以恒定的速率来执行任务的。

然后我们再看一下 scheduleWithFixedDelay 方法,和上面测试类一样,只不过我们把 scheduleAtFixedRate 换为了 scheduleWithFixedDelay 。

scheduledExecutorService.scheduleWithFixedDelay(command,10,1000,TimeUnit.MILLISECONDS);

然后观察一下输出结果

微信图片_20220416151641.png

可以看到,两个 current timestamp 之间的间隔大约等于 1000(固定时间) + delay(time spend) 的总和,由此可以确定 scheduleWithFixedDelay 是以固定时延来执行的。

线程池的描述

下面我们先来认识一下什么是线程池,线程池从概念上来看就是一个池子,什么池子呢?是指管理同一组工作线程的池子,也就是说,线程池会统一管理内部的工作线程。

wiki 上说,线程池其实就是一种软件设计模式,这种设计模式用于实现计算机程序中的并发。

比如下面就是一个简单的线程池概念图。

微信图片_20220416151645.png

注意:这个图只是一个概念模型,不是真正的线程池实现,希望读者不要混淆。

可以看到,这种其实也相当于是生产者-消费者模型,任务队列中的线程会进入到线程池中,由线程池进行管理,线程池中的一个个线程就是工作线程,工作线程执行完毕后会放入完成队列中,代表已经完成的任务。

上图有个缺点,那就是队列中的线程执行完毕后就会销毁,销毁就会产生性能损耗,降低响应速度,而我们使用线程池的目的往往是需要把线程重用起来,提高程序性能。

所以我们应该把执行完成后的工作线程重新利用起来,等待下一次使用。

线程池创建

我们上面大概聊了一下什么线程池的基本执行机制,你知道了线程是如何复用的,那么任何事物不可能是凭空出现的,线程也一样,那么它是如何创建出来的呢?下面就不得不提一个工具类,那就是 Executors

Executors 也是java.util.concurrent 包下的成员,它是一个创建线程池的工厂,可以使用静态工厂方法来创建线程池,下面就是 Executors 所能够创建线程池的具体类型。

微信图片_20220416151650.png

  • newFixedThreadPool:newFixedThreadPool 将会创建固定数量的线程池,这个数量可以由程序员通过创建 Executors.newFixedThreadPool(int nThreads)时手动指定,每次提交一个任务就会创建一个线程,在任何时候,nThreads 的值是最多允许活动的线程。如果在所有线程都处于活跃状态时有额外的任务被创建,这些新创建的线程会进入等待队列等待线程调度。如果有任何线程由于执行期间出现意外导致线程终止,那么在执行后续任务时会使用等待队列中的线程进行替代。
  • newWorkStealingPool:newWorkStealingPool 是 JDK1.8 新增加的线程池,它是基于 fork-join 机制的一种线程池实现,使用了 Work-Stealing 算法。newWorkStealingPool 会创建足够的线程来支持并行度,会使用多个队列来减少竞争。work-stealing pool 线程池不会保证提交任务的执行顺序。
  • newSingleThreadExecutor:newSingleThreadExecutor 是一个单线程的执行器,它只会创建单个线程来执行任务,如果这个线程异常结束,则会创建另外一个线程来替代。newSingleThreadExecutor 会确保任务在任务队列中的执行次序,也就是说,任务的执行是 有序的
  • newCachedThreadPool:newCachedThreadPool 会根据实际需要创建一个可缓存的线程池。如果线程池的线程数量超过实际需要处理的任务,那么 newCachedThreadPool 将会回收多余的线程。如果实际需要处理的线程不能满足任务的数量,则回你添加新的线程到线程池中,线程池中线程的数量不存在任何限制。
  • newSingleThreadScheduledExecutor:newSingleThreadScheduledExecutor 和 newSingleThreadExecutor 很类似,只不过带有 scheduled 的这个执行器哥们能够在一定延迟后执行或者定期执行任务。
  • newScheduledThreadPool:这个线程池和上面的 scheduled 执行器类似,只不过 newSingleThreadScheduledExecutor 比 newScheduledThreadPool 多加了一个 DelegatedScheduledExecutorService 代理,这其实包装器设计模式的体现。

上面这些线程池的底层实现都是由 ThreadPoolExecutor 来提供支持的,所以要理解这些线程池的工作原理,你就需要先把 ThreadPoolExecutor 搞明白,下面我们就来聊一聊 ThreadPoolExecutor。

ThreadPoolExecutor 类

ThreadPoolExecutor 位于 java.util.concurrent 工具类下,可以说它是线程池中最核心的一个类了。如果你要想把线程池理解透彻的话,就要首先了解一下这个类。

如果我们再拿上面家族举例子的话,ThreadPoolExecutor 就是一个家族的骨干人才,家族顶梁柱。ThreadPoolExecutor 做的工作真是太多太多了。

首先,ThreadPoolExecutor 提供了四个构造方法,然而前三个构造方法最终都会调用最后一个构造方法进行初始化

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
      // 1
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
    // 2
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
    // 3
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
    // 4
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

所以我们直接就来看一波最后这个线程池,看看参数都有啥,如果我没数错的话,应该是有 7 个参数(小学数学水平。。。。。。)

  • 首先,一个非常重要的参数就是 corePoolSize,核心线程池的容量/大小,你叫啥我觉得都没毛病。只不过你得理解这个参数的意义,它和线程池的实现原理有非常密切的关系。你刚开始创建了一个线程池,此时是没有任何线程的,这个很好理解,因为我现在没有任务可以执行啊,创建线程干啥啊?而且创建线程还有开销啊,所以等到任务过来时再创建线程也不晚。但是!我要说但是了,如果调用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就会在没有任务到来时创建线程,前者是创建 corePoolSize 个线程,后者是只创建一个线程。Lea 爷爷本来想让我们程序员当个懒汉,等任务来了再干;可是你非要当个饿汉,提前完成任务。如果我们想当个懒汉的话,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中。

微信图片_20220416151655.png

  • maximumPoolSize :又来一个线程池的容量,只不过这个是线程池的最大容量,也就是线程池所能容纳最大的线程,而上面的 corePoolSize 只是核心线程容量。

我知道你此时会有疑问,那就是不知道如何核心线程的容量和线程最大容量的区别是吧?我们后面会解释这点。

  • keepAliveTime:这个参数是线程池的保活机制,表示线程在没有任务执行的情况下保持多久会终止。在默认情况下,这个参数只在线程数量大于 corePoolSize 时才会生效。当线程数量大于 corePoolSize 时,如果任意一个空闲的线程的等待时间 > keepAliveTime 后,那么这个线程会被剔除,直到线程数量等于 corePoolSize 为止。如果调用了 allowCoreThreadTimeOut 方法,线程数量在 corePoolSize 范围内也会生效,直到线程减为 0。
  • unit :这个参数好说,它就是一个 TimeUnit 的变量,unit 表示的是 keepAliveTime 的时间单位。unit 的类型有下面这几种
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
  • workQueue:这个参数表示的概念就是等待队列,我们上面说过,如果核心线程 > corePoolSize 的话,就会把任务放入等待队列,这个等待队列的选择也是一门学问。Lea 爷爷给我们展示了三种等待队列的选择


微信图片_20220416151703.png

  • SynchronousQueue: 基于阻塞队列(BlockingQueue)的实现,它会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。使用 SynchronousQueue 阻塞队列一般要求maximumPoolSizes 为无界,也就是 Integer.MAX_VALUE,避免线程拒绝执行操作。
  • LinkedBlockingQueue:LinkedBlockingQueue 是一个无界缓存等待队列。当前执行的线程数量达到 corePoolSize 的数量时,剩余的元素会在阻塞队列里等待。
  • ArrayBlockingQueue:ArrayBlockingQueue 是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于 corePoolSize 时,多余的元素缓存在 ArrayBlockingQueue 队列中等待有空闲的线程时继续执行,当 ArrayBlockingQueue 已满时,加入 ArrayBlockingQueue 失败,会开启新的线程去执行,当线程数已经达到最大的 maximumPoolSizes 时,再有新的元素尝试加入 ArrayBlockingQueue时会报错
  • threadFactory:线程工厂,这个参数主要用来创建线程;
  • handler :拒绝策略,拒绝策略主要有以下取值



微信图片_20220416151707.png

  • AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。
  • DiscardPolicy: 直接丢弃任务,但是不抛出异常。
  • DiscardOldestPolicy:直接丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
  • CallerRunsPolicy:由调用线程处理该任务。
目录
打赏
0
0
0
0
4
分享
相关文章
微信小程序图表制作利器:ECharts组件的使用与技巧
微信小程序图表制作利器:ECharts组件的使用与技巧
377 1
实现在vue中自定义主题色彩切换
实现在vue中自定义主题色彩切换
135 0
在win10中使用ModelScope官方镜像
为在办公环境笔记本win10上测试ModelScope的开源模型 ,记录踩坑过程
2480 0
在win10中使用ModelScope官方镜像
【推荐几款实用的网盘资源搜索引擎】
云盘资源,浩如烟海,但缺乏搜索工具,让无数网友苦不堪言。幸运的是,现在有了强大的网盘搜索引擎,一切问题迎刃而解。轻松找到您需要的文件,快速下载,节省时间。享受便捷的云盘体验,释放您的创造力。无论是工作文档还是娱乐资源,网盘搜索引擎助您轻松搞定。解放您的搜索困扰,开启全新的云盘世界!
6971 1
阿里云服务器1M带宽速度解析,看看可以支持多少访问量?
阿里云服务器1M公网带宽的下载速度是多少?有人说阿里云1m带宽是小水管。那么,阿里云服务器1M带宽速度快吗?带着这些疑问,我们一起了解下1M带宽可以支持多少访问量?
1518 0
一文搞懂redis
NoSQL泛指非关系型数据库,随着web2.0互联网的诞生,传统的关系型数据库很难对付web2.0大数据时代!尤其是超大规模的高并发的社区,暴露出来很多难以克服的问题,NoSQL在当今大数据环境下发展的十分迅速,Redis是发展最快的。
一文搞懂redis
面对DNS劫持,只能坐以待毙吗?
借助 ARMS-云拨测,我们可实时对网站进行监控,实现分钟级别的监控,及时发现 DNS 劫持以及页面篡改。
面对DNS劫持,只能坐以待毙吗?
OSPF中的次优外部路由——Forwarding Address
在OSPF中外部路由是从ASBR(自治系统边界路由器)中导进来的(第五类的LSA)
467 0
OSPF中的次优外部路由——Forwarding Address
深入理解Fabric区块链Gossip机制
Gossip在Hyperledger Fabric中发挥着重要的作用。在这个教程中,我们将分阶段考察Fabric网络启动时gossip的运行机制,学习Fabric中的一些核心概念,例如主导节点/leader、锚节点/anchor等,理解 gossip是如何帮助Hyperledger Fabric成为一个可伸缩的联盟链平台。
1868 0
深入理解Fabric区块链Gossip机制
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问