并发设计模式实战系列(11):两阶段终止(Two-Phase Termination)

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十一章,废话不多说直接开始~


image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十一章两阶段终止(Two-Phase Termination),废话不多说直接开始~

目录

一、核心原理深度拆解

1. 两阶段终止流程

2. 关键设计要点

3. 中断处理原则

二、生活化类比:餐厅打烊流程

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键实现细节

四、横向对比表格

1. 不同终止策略对比

2. 中断处理方案对比

五、高级实践技巧

1. 组合关闭多个服务

2. 带钩子的终止流程

3. 分布式系统终止方案

六、分布式场景下的两阶段终止(扩展)

1. 跨节点协调终止流程

2. 代码示例:基于ZooKeeper的实现

七、性能优化与陷阱规避(扩展)

1. 关键性能指标监控

2. 常见陷阱及解决方案

八、与其他模式的协同应用(扩展)

1. 与断路器模式结合

2. 与Actor模型整合

九、生产环境检查清单

1. 终止流程验证步骤

2. 关键日志记录点

十、终极对比:各类终止策略

1. 单机 vs 分布式终止

2. 超时配置黄金法则


一、核心原理深度拆解

1. 两阶段终止流程

┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│  发出终止信号  │───>│ 处理未完成请求 │───>│ 释放资源并退出 │
└───────────────┘    └───────────────┘    └───────────────┘

image.gif

2. 关键设计要点

  • 阶段1(通知阶段)
  • 通过 volatile标志位interrupt() 发出终止信号
  • 保证信号能被所有工作线程感知(内存可见性)
  • 阶段2(清理阶段)
  • 完成当前任务处理(拒绝新任务)
  • 关闭线程池/释放文件句柄/数据库连接等资源

3. 中断处理原则

while (!Thread.currentThread().isInterrupted()) {
    try {
        // 正常任务处理...
    } catch (InterruptedException e) {
        // 1. 重新设置中断标志(保持中断状态)
        Thread.currentThread().interrupt();
        // 2. 执行资源清理
        cleanup();
        break;
    }
}

image.gif


二、生活化类比:餐厅打烊流程

系统组件

现实类比

核心行为

阶段1通知

门口挂"停止营业"牌

不再接待新顾客

阶段2清理

服务员处理现有顾客

完成已点餐品,收拾桌椅

资源释放

关闭厨房设备

断电、锁门、清理食材

  • 异常处理:如果有顾客赖着不走(无法中断的任务),强制清场(超时机制)

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class TwoPhaseTermination {
    // 终止标志(volatile保证可见性)
    private volatile boolean shutdownRequested = false;
    
    // 工作线程池
    private final ExecutorService workers = Executors.newFixedThreadPool(4);
    
    // 监控线程
    private Thread monitorThread;
    
    public void start() {
        monitorThread = new Thread(() -> {
            while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
                try {
                    // 模拟监控任务
                    System.out.println("[Monitor] 检查系统状态...");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // 收到中断信号,准备终止
                    Thread.currentThread().interrupt();
                    System.out.println("[Monitor] 收到终止信号");
                }
            }
            System.out.println("[Monitor] 执行清理工作...");
        });
        monitorThread.start();
    }
    
    // 优雅终止方法
    public void shutdownGracefully() {
        // 阶段1:设置终止标志
        shutdownRequested = true;
        
        // 阶段2:中断所有线程
        monitorThread.interrupt();
        workers.shutdown(); // 停止接收新任务
        
        try {
            // 等待现有任务完成(带超时)
            if (!workers.awaitTermination(5, TimeUnit.SECONDS)) {
                workers.shutdownNow(); // 强制终止
            }
        } catch (InterruptedException e) {
            workers.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("系统已安全关闭");
    }
    
    // 提交任务方法
    public void submitTask(Runnable task) {
        if (!shutdownRequested) {
            workers.execute(() -> {
                try {
                    task.run();
                } catch (Exception e) {
                    if (shutdownRequested) {
                        System.out.println("任务被终止: " + e.getMessage());
                    }
                }
            });
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        TwoPhaseTermination system = new TwoPhaseTermination();
        system.start();
        
        // 模拟提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            system.submitTask(() -> {
                try {
                    Thread.sleep(500);
                    System.out.println("执行任务: " + taskId);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 5秒后触发终止
        Thread.sleep(5000);
        system.shutdownGracefully();
    }
}

image.gif

2. 关键实现细节

// 双重终止检查(提高响应速度)
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
    // ...
}
// 资源清理模板
try {
    // 正常业务代码...
} finally {
    cleanupResources(); // 保证无论如何都会执行
}

image.gif


四、横向对比表格

1. 不同终止策略对比

终止方式

是否优雅

资源安全性

响应速度

实现复杂度

System.exit()

⚡️立即

暴力kill -9

⚡️立即

两阶段终止

⏳可控

超时强制终止

⚠️部分

⚠️可能泄漏

⏳可配置

中高

2. 中断处理方案对比

方案

适用场景

优点

缺点

标志位检查

简单循环任务

实现简单

阻塞操作无法响应

Thread.interrupt()

含阻塞操作的任务

能唤醒阻塞

需处理InterruptedException

Future.cancel()

线程池任务

与线程池集成好

无法自定义清理逻辑

Poison Pill

生产者-消费者模式

精确控制

需要特殊消息设计


五、高级实践技巧

1. 组合关闭多个服务

public void shutdownAll(ExecutorService... services) {
    // 阶段1:发送关闭信号
    for (ExecutorService service : services) {
        service.shutdown();
    }
    
    // 阶段2:等待终止
    for (ExecutorService service : services) {
        try {
            if (!service.awaitTermination(10, TimeUnit.SECONDS)) {
                service.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            service.shutdownNow();
        }
    }
}

image.gif

2. 带钩子的终止流程

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("执行JVM退出前的清理...");
    // 记录最后状态、关闭外部连接等
}));

image.gif

3. 分布式系统终止方案

┌───────────────┐    ┌──────────────────┐    ┌───────────────┐
│ 停止负载均衡  │───>│ 完成进行中请求   │───>│ 下线服务实例  │
└───────────────┘    └──────────────────┘    └───────────────┘

image.gif


通过这种 分层解析+生产级代码示例 的方式,可以全面掌握两阶段终止模式的实现要点,并能在实际系统中实现安全、可控的服务终止。

好的!我将延续原有结构,从 第六部分 开始扩展两阶段终止模式的进阶内容,保持技术深度和实用性的统一。


六、分布式场景下的两阶段终止(扩展)

1. 跨节点协调终止流程

┌───────────────┐    ┌──────────────────┐    ┌───────────────┐
│  协调者广播   │───>│ 各节点执行终止   │───>│ 全局状态确认  │
│  TERMINATE信号 │    │  (本地两阶段)    │    │   (ACK汇总)   │
└───────────────┘    └──────────────────┘    └───────────────┘

image.gif

  • 实现方案
  • 使用 ZooKeeper临时节点 作为协调器
  • 通过 Redis Pub/Sub 广播终止信号
  • 采用 Saga事务模式 保证跨服务一致性

2. 代码示例:基于ZooKeeper的实现

public class DistributedTermination {
    private final CuratorFramework zkClient;
    private final String servicePath;
    private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
    public DistributedTermination(String zkAddress, String serviceName) {
        this.zkClient = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
        this.servicePath = "/services/" + serviceName;
        zkClient.start();
    }
    // 注册当前节点
    public void registerNode(String nodeId) throws Exception {
        zkClient.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.EPHEMERAL)
            .forPath(servicePath + "/" + nodeId);
    }
    // 分布式终止入口
    public void shutdownCluster() throws Exception {
        if (isShuttingDown.compareAndSet(false, true)) {
            // 阶段1:创建终止标记节点
            zkClient.create()
                .withMode(CreateMode.PERSISTENT)
                .forPath(servicePath + "/TERMINATE");
            // 阶段2:监听所有节点消失(确认终止完成)
            awaitTermination();
        }
    }
    // 节点自身的终止逻辑
    public void startShutdownListener() {
        PathChildrenCache watcher = new PathChildrenCache(zkClient, servicePath, true);
        watcher.getListenable().addListener((client, event) -> {
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED && 
                "TERMINATE".equals(event.getData().getPath())) {
                // 执行本地两阶段终止
                localShutdown();
            }
        });
    }
    private void awaitTermination() throws Exception {
        while (zkClient.getChildren().forPath(servicePath).size() > 1) {
            Thread.sleep(500);
        }
        zkClient.delete().forPath(servicePath + "/TERMINATE");
        System.out.println("集群终止完成");
    }
}

image.gif


七、性能优化与陷阱规避(扩展)

1. 关键性能指标监控

指标

监控方式

健康阈值

终止延迟

阶段1到阶段2的耗时统计

90%请求 < 2秒

资源释放率

文件句柄/连接池关闭验证

释放率 >= 99.9%

中断响应时间

从发送中断到线程停止的延迟

95%线程 < 500ms

2. 常见陷阱及解决方案

// 陷阱1:忘记恢复中断状态
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    // 错误做法:仅打印日志
    // log.error("Interrupted", e);
    
    // 正确做法:恢复中断状态
    Thread.currentThread().interrupt();
}
// 陷阱2:阻塞队列无法唤醒
BlockingQueue<Task> queue = new LinkedBlockingQueue<>();
// 需要特殊唤醒方式
queue.put(POISON_PILL); // 投递毒丸对象
// 陷阱3:第三方库不响应中断
Future<?> future = executor.submit(() -> {
    // 使用非中断阻塞的JNI调用
    nativeBlockingCall();
});
future.cancel(true); // 可能无法真正终止

image.gif


八、与其他模式的协同应用(扩展)

1. 与断路器模式结合

┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│  终止信号触发  │───>│ 断路器打开状态 │───>│ 拒绝新请求     │
│  (Phase 1)    │    │  (快速失败)    │    │  (Phase 2前置) │
└───────────────┘    └───────────────┘    └───────────────┘

image.gif

  • 实现要点
  • 在阶段1开始时立即触发断路器
  • 在阶段2完成后重置断路器状态

2. 与Actor模型整合

// Akka示例:优雅终止Actor
actorSystem.registerOnTermination(() -> {
    // 阶段2的清理逻辑
    database.close();
});
// 发送终止命令
Patterns.gracefulStop(actorRef, Duration.ofSeconds(5), Shutdown.getInstance());

image.gif


九、生产环境检查清单

1. 终止流程验证步骤

  1. 模拟突然终止:kill -9 后验证资源泄漏
  2. 压力测试中触发终止:观察未完成请求处理情况
  3. 验证分布式场景下脑裂处理能力
  4. 检查监控系统是否能捕获异常终止事件

2. 关键日志记录点

// 阶段1日志标记
log.info("TERMINATION PHASE1 STARTED | Pending tasks: {}", queue.size());
// 阶段2关键操作
log.info("Releasing DB connections | Active: {}", pool.getActiveCount());
// 最终确认
log.info("TERMINATION COMPLETED | Time elapsed: {}ms", System.currentTimeMillis() - startTime);

image.gif


十、终极对比:各类终止策略

1. 单机 vs 分布式终止

维度

单机两阶段终止

分布式两阶段终止

信号传播方式

内存可见性/线程中断

集群广播/协调服务

完成确认机制

线程池awaitTermination

集群状态共识算法

典型耗时

毫秒~秒级

秒~分钟级

资源清理保证

进程内可控

依赖各节点实现

2. 超时配置黄金法则

终止超时时间 = Max(平均任务处理时间 × 3, 网络延迟 × 10)

image.gif

  • 示例计算
  • 平均任务处理时间:200ms
  • 跨机房延迟:50ms
  • 计算结果:Max(600ms, 500ms) = 600ms

通过这十个维度的系统化解析,两阶段终止模式从单机实现到分布式协同,从基础原理到生产实践的全貌已完整呈现。建议结合具体业务场景,灵活应用这些模式变体。

相关文章
|
9月前
|
设计模式 消息中间件 监控
并发设计模式实战系列(5):生产者/消费者
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第五章,废话不多说直接开始~
300 1
|
4月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
267 2
|
7月前
|
设计模式 C++
【实战指南】设计模式 - 工厂模式
工厂模式是一种面向对象设计模式,通过定义“工厂”来创建具体产品实例。它包含简单工厂、工厂方法和抽象工厂三种形式,分别适用于不同复杂度的场景。简单工厂便于理解但扩展性差;工厂方法符合开闭原则,适合单一类型产品创建;抽象工厂支持多类型产品创建,但不便于新增产品种类。三者各有优缺点,适用于不同设计需求。
299 62
|
9月前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
263 0
|
9月前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
275 0
|
9月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
570 0
|
9月前
|
设计模式 消息中间件 监控
并发设计模式实战系列(3):工作队列
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第三章,废话不多说直接开始~
244 0
|
5月前
|
设计模式 人工智能 算法
基于多设计模式的状态扭转设计:策略模式与责任链模式的实战应用
接下来,我会结合实战案例,聊聊如何用「策略模式 + 责任链模式」构建灵活可扩展的状态引擎,让抽奖系统的状态管理从「混乱战场」变成「有序流水线」。
|
9月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
272 16
|
9月前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
214 0