Executor - Shutdown、ShutdownNow、awaitTermination 详解与实战

简介: 使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。

一.引言

使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。

二.API 释义

1.shutdown

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

image.gif

shutdown 主要工作如下:

Initiates an orderly shutdown in which previously submitted

tasks are executed - 之前提交的继续执行

but no new tasks will be accepted. - 不再接收新任务

This method does not wait for previously submitted tasks to

complete execution. - 该方法不会等待以前提交的任务完成,可以配合 awaitTermination 方法等待。

2.shutdownNow

/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

image.gif

shutdownNow 主要工作如下:

Attempts to stop all actively executing tasks - 尝试停止正在执行的任务

halts the processing of waiting tasks - 停止等待的任务

and returns a list of the tasks that were awaiting execution - 返回等待列表的任务

These tasks are drained (removed) from the task queue upon return from this method. - 从该方法返回时,将这些任务从队列中删除

Tips:

该任务不会等待主动执行的任务终止,可以配合 awaitTermination 方法等待。

该方法尽可能停止主动执行的任务,通过 Thread.interrupt 实现,未能响应中断的任务可能不会停止

该方法与 shutdown 差别在 interruptIdleWorkers 和 interruptWorkers,后者会调用 interrutp 方法到正在执行的 worker 上,而前者只会取消等待的任务。

3.awaitTermination

* Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

image.gif

awaitTermination 方法会等待线程到达 TERMINATED 即已终止的状态。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,注意该方法不会关闭线程池,只负责延时以及检测状态。

4.runState

A.状态

线程 runState 的几种状态与转换

RUNNING:接受新任务并处理排队的任务

SHUTDOWN:不接受新任务,但处理排队的任务

STOP:不接受新任务,不处理排队任务,并中断正在进行的任务

TIDYING:所有任务都已终止,workerCount 为零,线程转换到状态 TIDYING,将运行 terminate() Hook

TERMINATED:终止()已完成

B.转换

RUNNING -> SHUTDOWN : 在调用 shutdown() 时,可能隐含在 finalize() 中

(RUNNING or SHUTDOWN) -> STOP : 调用 shutdownNow()

SHUTDOWN -> TIDYING:当队列和池都为空时

STOP -> TIDYING:当池为空时

TIDYING -> TERMINATED:当 terminate() 钩子方法完成时

三.实践

1.shutdown + awaitTermination(500ms)

processNumBuffer 为 Runnable 内的逻辑,针对给定的一批数字求出最小,最大值并返回结果字符串保存,共 500000 个数字,每 50000 个数字生成一个 Runnable。执行逻辑后调用 shutdown + awaitTermination。getCurrentThread 方法负责打印当前的可用线程,用来观测调用 shutdown 和 awaitTermination 后线程池中线程的变化。

import java.util.concurrent.{CopyOnWriteArraySet, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
object ExecutorPoolShutdown {
  // Runnable 内执行逻辑,寻找上下界
  def processNumBuffer(nums: Array[Int], taskId: Long): String = {
    val maxNum = nums.max
    val minNum = nums.min
    val log = s"TaskId: $taskId Length: ${nums.length} Min: $minNum Max: $maxNum"
    log
  }
  def main(args: Array[String]): Unit = {
    // 存储所有 Task 的日志
    val logAll = new StringBuffer()
    // 存储所有可用的 TaskId
    val taskSet = new CopyOnWriteArraySet[Long]()
    // 初始化线程池
    val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
    val numIterator = (0 until 500000).iterator
    // 每50000个数据生成一个 Task
    numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId) // 添加 taskID
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n") // 添加统计日志
        }
      })
    })
    // 调用 shutdown
    executor.shutdown()
    // 获取当前线程
    println("After shutdown...")
    getCurrentThread()
    val st = System.currentTimeMillis()
    // 调用 awaitTerminatio
    val state = executor.awaitTermination(500, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")
    // 再次获取当前线程
    println("After awaitTermination...")
    getCurrentThread()
    println(logAll.toString)
    println(taskSet.toArray().mkString(","))
  }
}

image.gif

由于任务逻辑比较简单,调用 shutdow 和 awaitTermation 后线程数均为 5 且是和 pool 无关的线程,说明线程池的 Task 在 shutdow 后就已经全部运行完毕了,这点从  awaitTermation 返回的状态为 true 和等待时间为 0 也可以看出来。最后就是 500000 / 50000 = 10 共计10条 log,再次说明 task 都执行完毕,最后显示本次任务执行中共使用了 6 个 Task。

image.gif编辑

Tips:

shutdown 后如果线程池已经结束,则  awaitTermation 方法不会等待,直接返回 true。

2.任务延时 + shutdown + awaitTermination(500ms)

上面的任务执行速度求最大最小值,执行速度很快,所以不好看到 shutdown 和 awaitTerminatioN 的作用,下面模拟一个运行时间稍长的任务,在原始 Runnable 中加入 sleep(2000),延长程序2s的运行时间:

numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId)
          Thread.sleep(2000)
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n")
        }
      })
    })

image.gif

再次运行:

image.gif编辑

这次的结果和上面完全不同,首先是不管是调用 shutdown 还是 awaitTermination,可以看到活跃线程中都包含 pool-1-thread-x,即这两个 API 调用后逻辑仍在运行没有结束;再看 awaitTermination 返回的状态为 false,代表线程池未完全关闭,cost=507ms,等待 500ms 后线程池仍未完全退出,但是主线程已经结束,所以 LogAll 里没有日志加入,最后只打印出了使用过的 TaskId。

Tips:

shutdown 后线程仍在继续运行,对应前面提到的 shutdown 之后当前运行的任务继续执行,只不过不会增加新任务,而 awaitTermination 后线程依然活跃,对应前面的 awaitTermination 方法只返回线程池关闭状态,不会关闭线程池。

3.任务延时 + shutdown + awaitTermination(2000ms)

调整 awaitTermination 的等待时间,从500ms提升至2000ms

val st = System.currentTimeMillis()
    val state = executor.awaitTermination(2000, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")

image.gif

再次运行:

image.gif编辑

和上面相比,executor 的关闭状态返回的仍然是 false,但是等到的时间延迟至约 2000ms,可以看到随着等待时间增加,一部分 task已经完成了,但是并没有全部完成。将延时时间修改为 5000ms 再次运行:

image.gif编辑

等到 4000ms 时 executor 就结束了,所以 awaitTermination 返回为 true,后续也没有 pool-1-thread-x 相关的 task,最终的输出 log 也完整。

4.shutdownNow + awaitTermination(500ms)

shutdownNow 相比 shutdown 会多一个返回值,即等待列表的任务。

val tasks = executor.shutdownNow()
    tasks.asScala.foreach(task => {
      println(task)
    })

image.gif

运行一下:

image.gif编辑

由于任务运行很快,所以快速任务下,shutdown 和 shutdownNow 结果相同,awaitTermination 返回为 ture 且未等待。

5.任务延时 + shutdownNow + awaitTermination(500ms)

numIterator.grouped(50000).foreach(group => {
      executor.execute(new Runnable() {
        override def run(): Unit = {
          val taskId = Thread.currentThread().getId
          taskSet.add(taskId)
          Thread.sleep(2000)
          val res = processNumBuffer(group.toArray, taskId)
          logAll.append(res + "\n")
        }
      })
    })

image.gif

任务增加 sleep 2000ms 后再运行一下:

image.gif编辑

最上面的任务显示 :

Caused by: java.lang.InterruptedException: sleep interrupted

image.gif

即任务 sleep 期间被 interrupt 了,所以执行的 task 结束,和 shutdown 相比,正在执行的线程并不会被 interrupt;下面打印出来4个 Runnable,因为这四个 Task 还在等待队列中,shutdownNow 直接把他们返回了;最后下面因为 executor 已经关闭,所以状态为 true,等待时间为0。

6.任务延时 + awaitTermination(xxxms) + shutdownNow

通过上面 shotdownNow + awaitTermination 的示例中可以看到,如果任务不能很快执行,那么调用 shotdownNow 的结果就是所有 task 都没结束,任务没有任何改动。如果希望对任务设定一定期间,能完成多少完成多少,可以调整顺序,修改为先 awaitTermination 再 shutdownNow:

val st = System.currentTimeMillis()
    val state = executor.awaitTermination(3000, TimeUnit.MILLISECONDS)
    val cost = System.currentTimeMillis() - st
    println(s"Executor State: $state Cost: $cost")
    println("After awaitTermination...")
    getCurrentThread()
    val tasks = executor.shutdownNow()
    tasks.asScala.foreach(task => {
      println(task)
    })
    println("After shutdown...")
    getCurrentThread()

image.gif

调整完顺序后再次运行:

image.gif编辑

等待时间设置为 3000ms,相当于你对你的任务要求是: 3000ms 内能跑完多少算多少,没跑完就不要了;可以看到 awaitTermination 到期后返回状态为 false,说明线程内的任务还未全部结束;再看下面 shutdownNow 后,线程里已经不存在 pool-1-thread-x ,且打印出部分结果,共计6条;最下面是 interrupt 其他正在运行的 task 打印的异常栈,最终程序 exit(0) 正常退出。

四.总结

经过上面的代码分析,对几个方法进行一下总结:

shutdown : 等待执行的任务执行,不再添加新任务

shutdownNow:interrupt 当前执行的任务,不再添加新任务,返回等待的任务

awaitTermination:不影响线程池开关状态,只返回状态,可以堵塞线程等待一定时间

可以结合上面的6个例子以及自己任务的耗时和容忍度,决定怎么组合上面三个 API,如果一定要等到 executor 内的 task 都运行完毕再关闭 executor 且不好估算内部 task 运行时间,可以采用如下操作:

executor.shutdown()
      while (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {
         println("Task is Running...")
      }
      println("Task is Finish!")

image.gif

通过 while true 保证线程池内 task 都运行完毕才进行后续操作,不过需要注意 Task 内部不要有死循环,否则会导致无法跳出该 While 循环,整个程序堵塞在这里。

目录
相关文章
|
9月前
|
存储 缓存 NoSQL
mybatisplus一二级缓存
MyBatis-Plus 继承并优化了 MyBatis 的一级与二级缓存机制。一级缓存默认开启,作用于 SqlSession,适用于单次会话内的重复查询;二级缓存需手动开启,跨 SqlSession 共享,适合提升多用户并发性能。支持集成 Redis 等外部存储,增强缓存能力。
|
数据采集 JavaScript Java
CompletableFuture异步编排,你还不会?
本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。
681 3
CompletableFuture异步编排,你还不会?
|
Java 开发者 Spring
Spring AOP 底层原理技术分享
Spring AOP(面向切面编程)是Spring框架中一个强大的功能,它允许开发者在不修改业务逻辑代码的情况下,增加额外的功能,如日志记录、事务管理等。本文将深入探讨Spring AOP的底层原理,包括其核心概念、实现方式以及如何与Spring框架协同工作。
|
SQL 监控 Oracle
Oracle SQL性能优化全面指南
在数据库管理领域,Oracle SQL性能优化是确保数据库高效运行和数据查询速度的关键
1781 6
|
网络协议 Java 关系型数据库
一篇文章彻底理解数据库的各种 JDBC 超时参数 2
一篇文章彻底理解数据库的各种 JDBC 超时参数
|
算法 前端开发 Java
支撑每秒数百万订单无压力,SpringBoot + Disruptor 太猛了!
本文详细介绍如何通过 Spring Boot 集成 Disruptor 实现每秒处理数百万订单的高性能系统。Disruptor 是一种无锁并发框架,采用环形缓冲区和无锁算法,提供极低延迟和高吞吐量。文章涵盖 Maven 配置、事件工厂、处理器及生产者实现,并通过 REST API 和 Thymeleaf 展示订单创建流程。Disruptor 在高并发场景下表现出色,是解决高性能并发处理的理想方案。
|
存储 安全 物联网
Android经典实战之跳转到系统设置页面或其他系统应用页面大全
本文首发于公众号“AntDream”,关注获取更多技巧。文章总结了Android开发中跳转至系统设置页面的方法,包括设备信息、Wi-Fi、显示与声音设置等,并涉及应用详情与电池优化页面。通过简单的Intent动作即可实现,需注意权限与版本兼容性。每日进步,尽在“AntDream”。
2266 2
|
自然语言处理 算法 搜索推荐
NLTK模块使用详解
NLTK(Natural Language Toolkit)是基于Python的自然语言处理工具集,提供了丰富的功能和语料库。本文详细介绍了NLTK的安装、基本功能、语料库加载、词频统计、停用词去除、分词分句、词干提取、词形还原、词性标注以及WordNet的使用方法。通过示例代码,帮助读者快速掌握NLTK的核心功能。
3203 1
|
消息中间件 缓存 Kafka
原理剖析| 一文搞懂 Kafka Producer(上)
本文介绍了Apache Kafka 3.7的Producer使用及原理,讲解了如何创建和使用Producer,展示了一个发送消息的示例代码,并介绍了ProducerRecord和Callback接口。ProducerRecord包含topic、partition等属性,Callback用于发送消息后的回调处理。接着阐述了send、flush和close方法的功能。文章还探讨了核心组件,包括ProducerMetadata、RecordAccumulator、Sender和TransactionManager,以及消息发送流程。最后,讨论了元数据刷新、分区选择、消息攒批和超时处理等实现细节。
1003 0
原理剖析| 一文搞懂 Kafka Producer(上)
|
Java Linux Maven
用sdkman在linux上管理多个java版本
本文介绍了如何在Linux上使用SDKMAN来管理多个Java版本,包括安装SDKMAN、验证安装、列出和安装不同版本的JDK、Maven和Gradle,以及如何切换使用不同版本。
1559 0