Executor - 一文搞懂 ThreadPoolExecutor 与 BlockingQueue

简介: ThreadPool 是 java 的一种多线程处理方式,和前面提到了 RedisPool 类似,即通过一个 pool 批量管理,ThreadPool 管理线程,RedisPool 管理 Jedis 连接。下面主要介绍 ThreadPool 的参数含义,BlockingQueue 的几种类型以及 Executors 下 newCachedThreadPool、newFixedThreadPool、newSingleThreadPool 以及 newScheduleThreadPool 的使用与不同。....

一.引言

ThreadPool 是 java 的一种多线程处理方式,和前面提到了 RedisPool 类似,即通过一个 pool 批量管理,ThreadPool 管理线程,RedisPool 管理 Jedis 连接。下面主要介绍 ThreadPool 的参数含义,BlockingQueue 的几种类型以及 Executors 下 newCachedThreadPool、newFixedThreadPool、newSingleThreadPool 以及 newScheduleThreadPool 的使用与不同。

二.ThreadPoolExecutor

1.ThreadPoolExecutor 初始化

image.gif编辑

ThreadPoolExecutor 位于 java.util.concurrent 下,共提供四种构造方法,整体参数类似, 最常用的初始化方法是第一类,下面初始化一个简易的 ThreadPool:

val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue())

image.gif

其中 corePoolSize 为 6,maximumPoolSize 为 10,KeepAliveTime 为 3000 ms,使用 LinkedBlockingQueue 作为堵塞队列。

2.ThreadPoolExecutor 参数

corePoolSize-核心线程数 【工厂正式工👷】

核心线程是 ThreadPool 的主力,大部分时间核心线程均处于活跃状态。

maxPoolSize-最大线程数 【工厂临时工👷】

线程池内可以容纳的最大线程数,当线程数介于 maxPoolSize 与 maxPoolSize 之间且任务堵塞队列已满时,线程池会创建新线程执行任务;当线程数达到 maxPoolSize 且任务堵塞队列已满时,线程池会通过 RejectedExecutionHandler 拒绝任务,即构造方法的最后一个 handler 参数。

keepAliveTime-线程最大生命周期 【临时工雇佣时长⌚️】

线程的最大生命周期,这里的线程指超出 corePoolSize 介于 maxPoolSize 的线程,即上面提到的 '临时工',当这些线程在 keepAliveTime 期限内处于非运行状态时,该线程会被回收。

unit-时间单位 【临时工雇佣时长单位】

TimeUtil 内的枚举类单位,一般使用 MILLISECONDS 和 SECONDS。

workQueue-任务队列 【工厂任务仓库🏡】

任务队列,用于存储 ThreadPool 需要执行的任务,一般以 Runnable 的形式提交到 ThreadPool,workQueue 采用 BlockingQueue 作为存储对象。

threadFactory-线程工厂 【工人培训中心👨‍🏫】

为线程池创建新的线程,可以修改现成的名称、线程组、优先级以及守护程序状态。如果 Factor 被访问时无法生成新线程将返回 thread = null 并继续,但可能无法执行任何任务。

handler-拒绝处理 【任务筛选📃】

当提交的 Runnable 任务被执行者关闭或者线程达到 MaxPoolSize 且堵塞队列已满时,拒绝继续提交新的任务,拒绝策略默认会丢出 RejectedExecutionException 异常。自带的拒绝策略包含: (1) AbortPolicy (2) CallerRunsPolicy (3) DiscardPolicy (4) DiscardOldestPolicy 也可以基于 RejectedExecutionHandler 自定义拒绝策略。

3.ThreadPoolExecutor 工作模式

image.gif编辑

A.线程数 < corePoolSize

ThreadPool 看作是一个【工厂🏭】,当任务到来后,工厂开始招呼 corePoolSize【正式工👷】来干活。

B.线程数 >= corePoolSize

由于任务逐渐增多,corePoolSize【正式工👷】手里已经都有活干了,所以需要一个地方存放没有做的任务,这就需要 workQueue【仓库🏡】来存放来不及做的任务。

(1) 如果 workQueue【仓库🏡】内任务未满,那就继续将新来的任务放到仓库,等着 corePoolSize【正式工👷】做完手头的再来拿新的。

(2) 如果 workQueue【仓库🏡】爆仓了,为了保证 ThreadPool【工厂🏭】的效率,这时候需要招聘 MaxPoolSize【临时工👷】MaxPoolSize 来帮忙。

C.线程数 = MaxPoolSize

corePoolSize【正式工👷】和 MaxPoolSize【临时工👷】忙不过来了,ThreadPool【工厂🏭】处于满负荷状态,此时不再接收新的任务订单,新来的任务交给 handler【任务筛选📃】帮忙拒绝新来的订单。

D.任务执行完毕

旺季已过,ThreadPool【工厂🏭】没有新的订单,持续闲置 keepAliveTime 后,maxPoolSize 新增招聘的【临时工👷】变成了【毕业生👨‍🎓】,减少了工厂的开销,而 corePoolSize【正式工👷】也视情况休息待命。

4.上下文函数

ThreadPool 提供重写方法在线程执行前后提供函数,可以在函数内进行一些环境上下文以及相关日志的打印反应任务执行的进度

image.gif编辑

三.BlockingQueue

1.BlockingQueue 简介

任何 BlockingQueue 都可用于传输和保存已提交的任务,BlockingQueue 继承 Collection,所以同样具有集合的相关功能。由于其具备堵塞线程的工程,所以是线程安全的队列,从而保证一个任务不会被多次消费或重复存储的问题,基于这个特性,ThreadPool 采用 BlockingQueue 作为任务存储的载体。

A.如果运行的线程数小于 corePoolSize,则 executorPool 倾向于添加新的线程而不是在队列排队

B.如果运行的线程数大于等于 corePoolSize,则 executorPool 倾向于排队而不是增加新的线程

C.如果请求无法排队即队列已满,则会创建一个新线程,如果超过 MaxPoolSize,请求将会被拒绝

ThreadPool 主要使用如下 BlockingQueue 作为任务堵塞队列:

(1) ArrayBlockingQueue (2) LinkedBlockingQueue (3) SynchronousQueue

在 ThreadPool 背景下,BlockingQueue 其实是一个经典的【生产者-消费者】模型,下面初始化一个简单的生产者和消费者供后续理解:

class Producer(blockingQueue: BlockingQueue[Int], limit: Int = 10) extends Runnable {
    var current: Int = 0
    override def run(): Unit = {
      try {
        while (current < limit) {
          // 使用 offer 会导致数据丢失
          blockingQueue.put(current)
          println(s"Put-Num $current")
          current += 1
        }
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }
    }
  }
  class Consumer(blockingQueue: BlockingQueue[Int], limit: Int) extends Runnable {
    override def run(): Unit = {
      try {
        var state = true
        while (state) {
          val takeNum = blockingQueue.take()
          println(s"TakeNum $takeNum")
        }
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }
    }
  }

image.gif

生产者负责向 BlockingQueue 中 put 数据,消费者 Consumer 负责从 BlockingQueue 中 take 获取数据,这也就模拟了 ThreadPool 中任务的传递和存储过程。

2.BlockingQueue 常用方法

上面提到 BlockingQueue 继承了 Collection,所以集合的常用方法都可以使用。

public interface BlockingQueue<E> extends Queue<E> {}
public interface Queue<E> extends Collection<E> {}

image.gif

image.gif编辑

下面主要说下 BlockingQueue 的几种堵塞方法:

put - 添加元素

如果对列未满,则正常插入元素;如果对列已满,则造成堵塞无法插入,直至对列出现空闲位置

task - 获取对列首元素

take 方法获取对列首元素并将其移除,如果队列中无数据则阻塞,直至对列里出现新数据

offer - 添加元素

向队列添加元素,如果有容量并添加成功返回 true,如果容量受限添加失败则返回 false,该方法通常优于 add,因为后者可能因为添加失败而抛出异常

ThreadPool 的 BlockingQueue 中主要使用 put,take 方法维护任务的线程安全与一致性。

3.ArrayBlockingQueue

有界阻塞队列,由于限制了等待队列的长度,所以 ArrayBlockingQueue 能够避免无限任务增加至排队队列中,避免了资源被耗尽的情况,常与 maximumPoolSize 参数一起使用。使用大的 Queue 和小的 Pool 可以最大限度的减少 CPU 的利用率、操作系统资源和上下文切换的开销,但是吞吐量会受到一定影响。由于 ArrayBlockingQueue 的大小决定了队列的可存储任务量进而影响线程数到达 maximumPoolSize 的速度,所以需要根据 pool 的大小调整 ArrayBlockingQueue 队列的大小,避免任务快速堆积导致的 ThreadPool 异常。下面定义 main 函数结合上面的 Producer 和 Consumer 看一下 ArrayBlockingQueue 的使用:

def main(args: Array[String]): Unit = {
    val limit = 5
    val queue: ArrayBlockingQueue[Int] = new ArrayBlockingQueue(1)
    val producer = new Producer(queue, limit)
    val consumer = new Consumer(queue, limit)
    val produceThread = new Thread(producer)
    val consumeThread = new Thread(consumer)
    produceThread.start()
    consumeThread.start()
  }

image.gif

为了模拟队列生产满等待消费的情况,这里使用 capacity=1 的 ArrayBlockingQueue:

image.gif编辑

可以看到生产消费一一对应,Producer 生产1条数据到队列,队列满等待 Consumer 消费一条数据,队列有空余,如此循环往复,到最后一次数据被 take 拿走后,Producer 不再生产数据而 Consumer while(true) 持续获取数据,此时造成堵塞(程序未退出),待生产者再生产数据,consumer 则会解除堵塞。

4.LinkedBlockingQueue

无界阻塞队列,LinkedBlockingQueue 没有预定义的容量,默认大小为 Integer.MAX_VALUE,大多数情况下都会导致运行线程永远不会超过 corePoolSize,从而造成大量任务排队。该队列适合解决短时大量增加的需求,类似于热点事件时应对高 QPS,因为其容忍度很高可以存储过多任务,但是失去了扩展性,因为这里 maximumPoolSize 参数大部分时间不会生效。将上述 main 函数的 ArrayBlockingQueue 替换为 LinkedBlockingQueue 看一下效果:

def main(args: Array[String]): Unit = {
    val limit = 10
    val queue: LinkedBlockingQueue[Int] = new LinkedBlockingQueue[Int]()
    val producer = new Producer(queue, limit)
    val consumer = new Consumer(queue, limit)
    val produceThread = new Thread(producer)
    val consumeThread = new Thread(consumer)
    produceThread.start()
    consumeThread.start()
  }

image.gif

由于  LinkedBlockingQueue 无界,所以生产者生成不受限制提高了效率,消费者依次取出,直到取完而后发生堵塞,等待生产者再次生产数据。

image.gif编辑

Tips:

image.gif编辑

LinkedBlockingQueue 默认容量为 Integer.MAX_VALUE,同时也提供了设置容量的构造方法,添加 capacity 后,LinkedBlockingQueue 变为 ArrayBlockingQueue,下面初始化 LinkedBlockingQueue(1) 模拟一下:

val limit = 5
    val queue: LinkedBlockingQueue[Int] = new LinkedBlockingQueue[Int](1)

image.gif

image.gif编辑

Put 和 Take 又变成一一对应的关系,由于 println 的微延迟,可能出现 put 和 take 日志打印前后顺序的差别,但是一一对应的关系不会改变。

5.SynchronousQueue

同步队列,相对于其他几个 Queue 比较特别,可以看到官方给出的 isEmpty 方法和 size 方法,SynchronousQueue 没有内部存储空间,即 SynchronousQueue 不存储任务只传输任务,生产者的插入操作需要等到对应消费者的消费操作,即二者必须匹配出现,如果不消费则也不生产。

image.gif编辑

val queue: SynchronousQueue[Int] = new SynchronousQueue()

image.gif

将 queue 切换为  SynchronousQueue 再次运行 Consumer 和 Producer 依旧是一一对应的关系。

image.gif编辑

Tips:

下面验证一下生产者和消费者端的同步协议,这次修改消费者消费规则,生产者不变,生产5个数字,消费者消费4个,相当于少消费一个元素,看一下效果:

class Consumer(blockingQueue: BlockingQueue[Int], limit: Int) extends Runnable {
    override def run(): Unit = {
      try {
        var state = true
        while (state) {
          val takeNum = blockingQueue.take()
          if (takeNum >= limit - 2) {
            state = false
          }
          println(s"TakeNum $takeNum")
        }
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }
    }
  }

image.gif

取消最后一个数字的消费后可以看到最后一个数字的生产也受到影响:

image.gif编辑

四.ThreadPool

1.常见 ThreadPool

常见的 ThreadPool 有如下几种:

newCachedThreadPool: 缓存形线程池

newFixedThreadPool: 指定工作线程数量的线程池

newSingleThreadExecutor: 单一线程池

newScheduleThreadPool: 调度线程池

image.gif编辑

四种线程池均初始化自 java.util.concurrent.Executors 类并继承了 ExecutorService 接口,ThreadPool 支持使用 submit 单独提交 Runnable 也可以通过 invokeAll 批量提交 Runnable 任务并返回 Future 跟踪异步的任务。除了提交任务外,还提供了 shutdown、shotdonwNow 和 awaitTermination 方法支持线程池优雅的关闭,这个之前博客有细讲过,有需要的同学可以移步Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战

def addRunnable(executor: ExecutorService, limit: Int = 5): Unit = {
    (0 until limit).foreach(epoch => {
      executor.submit(new Runnable {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          println(s"TaskId: $taskId Current: $epoch")
        }
      })
    })
  }

image.gif

下面分别介绍四种线程池的特点与使用场景,并且基于上述 addRunnable 函数进行功能展示,这里传入参数为 ExecutorService,适用于上面四种线程池。

2.newCachedThreadPool

newCachedThreadPool 创建一个可以缓存线程的线程池,可以灵活控制回收空闲线程

image.gif编辑

newCachedThreadPool 基于 ThreadPoolExecutor 构建,其余三个线程池同理,CachedPool 的 corePoolSize 为 0,MaxPoolSize 为 Integer.MaxValue 代表其在有需要的情况下可以大量创建新线程,新线程的空闲时间 keepAliveTime 为 60s,其堵塞队列为 SynchronousQueue。

A.根据需要创建新线程,也可以重用之前构造的线程(cache),避免线程的多次初始化,通常会提升性能

B.60s 未使用的线程将终止并删除,所以空闲时间够长不会占用资源

基于上述两个特点,newCachedThreadPool 适合短时间内需要大量执行的快速任务,即短暂的高峰值任务,而且执行完毕后在低峰时线程全部关闭可以节省资源。但是 MaxPoolSize = Interger.MAX_VALUE 也使其面对一定的内存爆炸风险。

def testCachedThreadPool(): Unit = {
      val cachePool: ExecutorService = Executors.newCachedThreadPool()
      addRunnable(cachePool)
      getCurrentThread()
      Thread.sleep(60000)
      println("Thread Sleep 60s...")
      getCurrentThread()
    }

image.gif

-> 适合短时间执行快速任务

getCurrentThread 方法可以获取当前本机活跃线程,可以看到添加 5 个 Runnable 后该线程池直接创建 5 个线程执行任务并且在 sleep 60s 后对应线程全部关闭,任务结束,说明线程池不会一直占用资源。如果运行任务之间有重叠且有线程空闲,就可以用到 cached 的线程,提高利用率,减少线程重复创建的开销。

image.gif编辑

3.newFixedThreadPool

固定线程池,固定是指 corePoolSize 的数量固定,其 corePoolSize 和 maxPoolSize 均由用户传入的 nThreads 决定且相等,keepAliveTime 为 0 且使用 LinkedBlockingQueue 作为堵塞队列

image.gif编辑

A.线程池 core 核心数与 maxPoolSize 相等,固定数量的线程共享一个无边界的阻塞队列

B.keepAliveTime = 0 线程容易失效被关闭

基于上述两个特点,newFixedThreadPool 的优点很明显,即用户可以根据资源很好的控制线程的数量,适合稳定的线性工作场景,但是缺点是由于 keepAliveTime = 0 引起的多次新建线程,会对程序造成不小开销且由于无阻塞队列的使用容易造成堵塞和内存风险。

def testFixedPool(): Unit = {
      val fixedPool: ExecutorService = Executors.newFixedThreadPool(4)
      addRunnable(fixedPool)
      getCurrentThread()
      Thread.sleep(1000)
      println("Thread Sleep 1s...")
      getCurrentThread()
    }

image.gif

-> 适合稳定的线性工作场景

可以看到虽然任务已经结束了,但是线程依旧存活,因为 fixed 固定线程在等待任务的到来而 LinkedBlockingQueue 内没有新任务送达,所以使用 newFixedThreadPool 需要注意尽量用在连续稳定执行的任务上,如果任务彻底结束需要关闭线程池,否则 nThreads 个线程会占用资源。

image.gif编辑

-> 优雅关闭线程池

def testFixedPool(): Unit = {
      val fixedPool: ExecutorService = Executors.newFixedThreadPool(4)
      addRunnable(fixedPool)
      getCurrentThread()
      Thread.sleep(1000)
      println("Thread Sleep 1s...")
      getCurrentThread()
      fixedPool.shutdown()
      while (!fixedPool.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
        println("线程池任务未结束!")
      }
      println("线程池任务运行结束")
      getCurrentThread()
    }

image.gif

使用 shutdown + awaitTermination 的组合可以优雅关闭线程池,需要注意单独调用 shutdow 可能会由于线程内的死循环导致线程池无法正常退出。

image.gif编辑

4.newSingleThreadExecutor

singleThread,单线程处理器,这里已经没有 pool 的池子的概念,相当于一个 mini 版的 newFixedThreadPool,即 nThreads = 1 的 newFixedThreadPool,因为单线程的原因,批量任务背景下使用场景比较局限

image.gif编辑

A.适合一些单线程的统计或者异步的有时效性的任务

-> 简单使用

def testSinglePool(): Unit = {
      val singlePool: ExecutorService = Executors.newSingleThreadExecutor()
      addRunnable(singlePool)
      getCurrentThread()
      Thread.sleep(1000)
      println("Thread Sleep 1s...")
      getCurrentThread()
    }

image.gif

image.gif编辑

5.newScheduleThreadPool

调度线程池,适用于定时分发任务或者分间隔分发任务,这里底层也不再使用 threadPool

image.gif编辑

一般分为两种模式:

-> scheduleAtFixedRate - 以 period 为单位,周期的提交并执行任务

image.gif编辑

-> scheduleWithFixedDelay - 以 delay 为单位,按间隔分别提交并执行任务

image.gif编辑

详细的代码分析与示例可以移步 newScheduledThreadPool : scheduleAtFixedRate 与 scheduleWithFixedDelay 详解

五.总结

ExecutorThreadPool、BlockingQueue、ThreadPool 的基本知识和使用大致就这些,这里只是浅显的对其定义和使用进行了展示,其内部复杂的同步机制、协同机制更加精彩,有机会可以继续深入学习!

目录
相关文章
|
存储 Java 调度
Java多线程 ThreadPoolExecutor自定义线程池
Java多线程 ThreadPoolExecutor自定义线程池
414 0
Java多线程 ThreadPoolExecutor自定义线程池
|
2月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解03-阻塞队列之LinkedBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是 Java 中的两种阻塞队列实现,它们的主要区别在于: 1. **数据结构**:ArrayBlockingQueue 采用固定大小的数组实现,而 LinkedBlockingQueue 则使用链表实现。 2. **容量**:ArrayBlockingQueue 在创建时必须指定容量,而 LinkedBlockingQueue 可以在创建时不指定容量,默认容量为 Integer.MAX_VALUE。 总结起来,如果需要高效并发且内存不是主要考虑因素,LinkedBlockingQueue 通常是更好的选择;
183 1
|
8月前
|
缓存 Java
Java线程池ExecutorService
Java线程池ExecutorService
63 0
浅谈Java线程池中的ThreadPoolExecutor工具类
浅谈Java线程池中的ThreadPoolExecutor工具类
342 0
|
缓存 Java 调度
JUC系列学习(一):线程池Executor框架及其实现ThreadPoolExecutor
`Executor` 框架将任务的提交与任务的执行**解耦**了。
|
安全 Java
Java并发 之 线程池系列 (2) 使用ThreadPoolExecutor构造线程池
Java并发 之 线程池系列 (2) 使用ThreadPoolExecutor构造线程池
157 0
Java并发 之 线程池系列 (2) 使用ThreadPoolExecutor构造线程池
|
Java Spring
线程池:第四章:ThreadPoolTaskExecutor和ThreadPoolExecutor有何区别?
线程池:第四章:ThreadPoolTaskExecutor和ThreadPoolExecutor有何区别?
163 0
线程池:第四章:ThreadPoolTaskExecutor和ThreadPoolExecutor有何区别?
|
网络协议 Java
Java并发:线程池详解(ThreadPoolExecutor)
Java并发:线程池详解(ThreadPoolExecutor)
136 0
|
缓存 Java
Java线程池ThreadPoolExecutor类使用详解
Java线程池ThreadPoolExecutor类使用详解
|
Java 调度
Java并发系列之7 深入理解线程池ThreadPoolExecutor
Java并发系列之7 深入理解线程池ThreadPoolExecutor