SmartLifecycle 包含了 Phased 接口以及默认实现:
public interface SmartLifecycle extends Lifecycle, Phased { int DEFAULT_PHASE = Integer.MAX_VALUE; @Override default int getPhase() { return DEFAULT_PHASE; } }
可以看出,只要实现了 SmartLifecycle,Phase 默认就是最大值。所以优雅关闭的 Lifecycle: WebServerGracefulShutdownLifecycle
的 Phase 就是最大值,也就是属于最先被关闭的那一组。
总结接入点 - Spring Boot + Undertow & 同步 Servlet 环境
1. 接入点一 - 通过添加实现 SmartLifecycle 接口的 Bean,指定 Phase 比 WebServerGracefulShutdownLifecycle 的 Phase 小
前面的分析中,我们已经知道了:WebServerGracefulShutdownLifecycle
的 Phase 就是最大值,也就是属于最先被关闭的那一组。我们想要实现的是在这之后加入一些优雅关闭的逻辑,同时在 Destroy Bean (前面提到的 ApplicationContext 关闭的第四步)之前(即 Bean 销毁之前,某些 Bean 销毁中就不能用了,比如微服务调用中的一些 Bean,这时候如果还有任务没完成调用他们就会报异常)。那我们首先想到的就是加入一个 Phase 在这时候的 Lifecycle,在里面实现我们的优雅关闭接入,例如:
@Log4j2 @Component public class BizThreadPoolShutdownLifecycle implements SmartLifecycle { private volatile boolean running = false; @Override public int getPhase() { //在 WebServerGracefulShutdownLifecycle 那一组之后 return SmartLifecycle.DEFAULT_PHASE - 1; } @Override public void start() { this.running = true; } @Override public void stop() { //在这里编写的优雅关闭逻辑 this.running = false; } @Override public boolean isRunning() { return running; } }
这样实现兼容性比较好,并且升级底层框架依赖版本基本上不用修改。但是问题就是,可能会引入某个框架里面带 Lifecycle bean,虽然他的 Phase 是正确的,小于 WebServerGracefulShutdownLifecycle 的,但是 SmartLifecycle.DEFAULT_PHASE - 1 即等于我们自定义的 Lifecyce, 并且这个正好是需要等待我们的优雅关闭结束再关闭的,并且由于 Bean 加载顺序问题导致框架的 Lifecycle 又跑到了我们自定义的 Lifecycle 前进行 stop。这样就会有问题,但是问题出现的概率并不大。
2. 接入点二 - 通过反射向 Undertow 的 GracefulShutdownHandler 的List<ShutdownListener> shutdownListeners
中添加 ShutdownListener 实现
这种实现方式,很明显,限定了容器必须是 undertow,并且可能升级的兼容性不好。但是可以在 Http 线程池优雅关闭后立刻执行我们的优雅关闭逻辑,不用担心引入某个依赖导致我们自定义的优雅关闭顺序有问题。与第一种孰优孰劣,请大家自行判断,简单实现是:
@Log4j2 @Componenet //仅在包含 Undertow 这个类的时候加载 @ConditionalOnClass(name = "io.undertow.Undertow") public class ThreadPoolFactoryGracefulShutDownHandler implements ApplicationListener<ApplicationEvent> { //获取操作 UndertowWebServer 的 gracefulShutdown 字段的句柄 private static VarHandle undertowGracefulShutdown; //获取操作 GracefulShutdownHandler 的 shutdownListeners 字段的句柄 private static VarHandle undertowShutdownListeners; static { try { undertowGracefulShutdown = MethodHandles .privateLookupIn(UndertowWebServer.class, MethodHandles.lookup()) .findVarHandle(UndertowWebServer.class, "gracefulShutdown", GracefulShutdownHandler.class); undertowShutdownListeners = MethodHandles .privateLookupIn(GracefulShutdownHandler.class, MethodHandles.lookup()) .findVarHandle(GracefulShutdownHandler.class, "shutdownListeners", List.class); } catch (Exception e) { log.warn("ThreadPoolFactoryGracefulShutDownHandler undertow not found, ignore fetch var handles"); } } @Override public void onApplicationEvent(ApplicationEvent event) { //仅处理 WebServerInitializedEvent 事件,这个是在 WebServer 创建并初始化完成后发出的事件 if (event instanceof WebServerInitializedEvent) { WebServer webServer = ((WebServerInitializedEvent) event).getWebServer(); //检查当前的 web 容器是否是 UnderTow 的 if (webServer instanceof UndertowWebServer) { GracefulShutdownHandler gracefulShutdownHandler = (GracefulShutdownHandler) undertowGracefulShutdown.getVolatile(webServer); //如果启用了优雅关闭,则 gracefulShutdownHandler 不为 null if (gracefulShutdownHandler != null) { var shutdownListeners = (List<GracefulShutdownHandler.ShutdownListener>) undertowShutdownListeners.getVolatile(gracefulShutdownHandler); shutdownListeners.add(shutdownSuccessful -> { if (shutdownSuccessful) { //添加你的优雅关闭逻辑 } else { log.info("ThreadPoolFactoryGracefulShutDownHandler-onApplicationEvent shutdown failed"); } }); } } } } }
如何实现额外线程池的优雅关闭
现在我们知道如何接入了,那么针对项目中的自定义线程池,如何把他们关闭呢?首先肯定是要先拿到所有要检查的线程池,这个不同环境方式不同,实现也比较简单,这里不再赘述,我们假设拿到了所有线程池,并且线程池只有以下两种实现(其实就是 JDK 中的两种线程池,忽略定时任务线程池 ScheduledThreadPoolExecutor):
java.util.concurrent.ThreadPoolExecutor
:最常用的线程池java.util.concurrent.ForkJoinPool
:ForkJoin 形式的线程池
针对这两种线程池如何判断他们是否已经没有任务在执行了呢?参考代码:
public static boolean isCompleted(ExecutorService executorService) { if (executorService instanceof ThreadPoolExecutor) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; //对于 ThreadPoolExecutor,就是判断没有任何 active 的线程了 return threadPoolExecutor.getActiveCount() == 0; } else if (executorService instanceof ForkJoinPool) { //对于 ForkJoinPool,复杂一些,就是判断既没有活跃线程,也没有运行的线程,队列里面也没有任何任务并且并没有任何等待提交的任务 ForkJoinPool forkJoinPool = (ForkJoinPool) executorService; return forkJoinPool.getActiveThreadCount() == 0 && forkJoinPool.getRunningThreadCount() == 0 && forkJoinPool.getQueuedTaskCount() == 0 && forkJoinPool.getQueuedSubmissionCount() == 0; } return true; }
如何判断所有线程池都没有任务了呢?由于实际应用可能很放飞自我,比如线程池 A 可能提交任务到线程池 B,线程池 B 有可能提交任务到线程池 C,线程池 C 又有可能提交任务给 A 和 B,所以如果我们依次遍历一轮所有线程池发现上面这个方法 isCompleted 都返回 true,也是不能保证所有线程池一定运行完了的(比如我依次检查 A,B,C,检查到 C 的时候,C 又提交任务到了 A 和 B 并结束,C 检查发现任务都完成了,但是之前检查过的 A,B 又有了任务未完成)。所以我的解决办法是:打乱所有线程池,遍历,检查每个线程池是否完成,如果检查发现都完成则计数器加 1,只要有未完成的就不加并清零计数器。不断循环,每次循环 sleep 1 秒,直到计数器为 3(也就是连续三次按随机顺序检查所有线程池都没有任何任务):
List<ExecutorService> executorServices = 获取所有线程池 for (int i = 0; i < 3; ) { //连续三次,以随机乱序检查所有的线程池都完成了,才认为是真正完成 Collections.shuffle(executorServices); if (executorServices.stream().allMatch(ThreadPoolFactory::isCompleted)) { i++; log.info("all threads pools are completed, i: {}", i); } else { //连续三次 i = 0; log.info("not all threads pools are completed, wait for 1s"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ignored) { } } }
RocketMQ-spring-starter 中是如何处理的
rocketmq 的官方 spring boot starter:https://github.com/apache/rocketmq-spring
其中是采用我们这里说的第一种接入点方式,将消费者容器做成 SmartLifcycle(Phase 为最大值,属于最优先的关闭组),在里面加入关闭逻辑:
DefaultRocketMQListenerContainer
@Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { //关闭消费者 consumer.shutdown(); } setRunning(false); } }
微信搜索“我的编程喵”关注公众号,加作者微信,每日一刷,轻松提升技术,斩获各种offer:
我会经常发一些很好的各种框架的官方社区的新闻视频资料并加上个人翻译字幕到如下地址(也包括上面的公众号),欢迎关注: