并发设计模式实战系列(15):Future/Promise

简介: 🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十五章,废话不多说直接开始~

 

image.gif 编辑

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十五章Future/Promise,废话不多说直接开始~

目录

一、核心原理深度拆解

1. 异步计算双阶段模型

2. 状态机流转

二、生活化类比:快递柜取件

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键机制说明

四、横向对比表格

1. 异步模式对比

2. 回调注册方式对比

五、高级应用技巧

1. 组合多个异步任务

2. 超时控制

3. 回调线程控制

六、源码级实现剖析(接五)

1. JDK FutureTask 核心逻辑

2. CompletableFuture 回调链实现

七、生产环境最佳实践

1. 异常处理模板

2. 资源清理策略

3. 性能监控指标

八、与其他模式的协作

1. 结合发布-订阅模式

2. 与反应式编程整合

九、各语言实现对比

十、常见陷阱与解决方案

1. 回调地狱问题

2. 线程泄漏场景

3. 上下文丢失问题


一、核心原理深度拆解

1. 异步计算双阶段模型

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Task      │───>│   Future    │───>│   Callback  │
│ Submission  │<───│  (Promise)  │<───│  Execution  │
└─────────────┘    └─────────────┘    └─────────────┘

image.gif

  • 提交阶段:主线程提交任务后立即返回Future占位符
  • 计算阶段:工作线程异步执行计算,通过Promise设置结果
  • 回调阶段:结果就绪后触发回调(观察者模式)

2. 状态机流转

public interface Future<V> {
    boolean isDone();      // 完成状态(成功/失败/取消)
    V get() throws...;     // 阻塞获取结果
    void addCallback(...); // 回调注册
}

image.gif


二、生活化类比:快递柜取件

系统组件

现实类比

核心行为

Future

快递柜取件码

凭码查询包裹是否到达

Promise

快递员存件操作

实际将包裹放入柜中并更新状态

Callback

短信通知服务

包裹入柜后自动发送取件提醒

  • 异步流程:下单→获得取件码(Future)→快递员送货(异步计算)→短信通知(Callback)

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.concurrent.*;
import java.util.function.Consumer;
public class FuturePromiseDemo {
    // 1. 自定义Promise实现
    static class MyPromise<V> implements Future<V>, Runnable {
        private volatile V result;
        private volatile Throwable error;
        private volatile boolean isDone;
        private final CountDownLatch latch = new CountDownLatch(1);
        private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();
        // 提交任务时执行的方法
        @Override
        public void run() {
            try {
                // 模拟耗时计算
                Thread.sleep(1000);
                setResult((V) "计算结果"); // 实际业务逻辑替换此处
            } catch (Exception e) {
                setError(e);
            }
        }
        // Promise核心方法:设置结果
        public void setResult(V result) {
            this.result = result;
            this.isDone = true;
            latch.countDown();
            notifyCallbacks();
        }
        // Promise核心方法:设置异常
        public void setError(Throwable error) {
            this.error = error;
            this.isDone = true;
            latch.countDown();
        }
        private void notifyCallbacks() {
            callbacks.forEach(cb -> cb.accept(result));
        }
        // Future实现方法
        @Override
        public V get() throws InterruptedException, ExecutionException {
            latch.await();
            if (error != null) throw new ExecutionException(error);
            return result;
        }
        @Override
        public boolean isDone() {
            return isDone;
        }
        // 注册回调(非JUC标准方法)
        public void addCallback(Consumer<V> callback) {
            if (isDone) {
                callback.accept(result);
            } else {
                callbacks.add(callback);
            }
        }
    }
    // 2. 使用示例
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 创建Promise并提交任务
        MyPromise<String> promise = new MyPromise<>();
        executor.submit(promise);
        // 注册回调
        promise.addCallback(result -> 
            System.out.println("[回调] 异步结果: " + result));
        // 同步阻塞获取
        System.out.println("[主线程] 立即返回,继续其他工作...");
        System.out.println("最终结果: " + promise.get());
        executor.shutdown();
    }
}

image.gif

2. 关键机制说明

// 1. 状态同步控制
private volatile boolean isDone;  // 保证可见性
private final CountDownLatch latch; // 实现阻塞等待
// 2. 线程安全回调列表
private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();
// 3. 异常处理流程
public void setError(Throwable error) {
    this.error = error;
    this.isDone = true;
    latch.countDown(); // 唤醒所有等待线程
}

image.gif


四、横向对比表格

1. 异步模式对比

模式

核心特点

适用场景

Java实现类

Future

阻塞式获取结果

简单异步任务

FutureTask

CompletableFuture

链式调用+组合操作

复杂异步流水线

CompletableFuture

Promise

可写的结果容器

跨线程结果传递

需自行实现

Callback

事件驱动无阻塞

高并发IO

Netty的ChannelFuture

2. 回调注册方式对比

方法

触发时机

线程安全性

链式支持

addCallback

结果就绪后立即执行

需自行保证

不支持

thenApply

前序阶段完成后触发

内置线程池控制

支持

whenComplete

无论成功失败都执行

可能在不同线程执行

支持


五、高级应用技巧

1. 组合多个异步任务

CompletableFuture<String> query1 = queryDatabase("sql1");
CompletableFuture<String> query2 = queryDatabase("sql2");
// 并行执行后合并结果
CompletableFuture<String> merged = query1.thenCombineAsync(query2, 
    (r1, r2) -> r1 + "|" + r2,
    ForkJoinPool.commonPool());

image.gif

2. 超时控制

Future<String> future = executor.submit(task);
try {
    String result = future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true); // 中断任务执行
}

image.gif

3. 回调线程控制

promise.addCallback(result -> {
    // 指定回调执行线程池
    ForkJoinPool.commonPool().execute(() -> processResult(result));
});

image.gif


通过这种 原理+实现+对比 的立体解析,可以掌握:

  1. Future/Promise的双阶段异步本质
  2. 如何实现生产级的Promise容器
  3. 不同异步模式的适用场景选择
  4. 复杂场景下的组合使用技巧

六、源码级实现剖析(接五)

1. JDK FutureTask 核心逻辑

// 状态机定义(OpenJDK 17)
private volatile int state;
static final int NEW          = 0; // 初始化状态
static final int COMPLETING   = 1; // 临时状态
static final int NORMAL       = 2; // 正常完成
static final int EXCEPTIONAL  = 3; // 异常完成
static final int CANCELLED    = 4; // 已取消
static final int INTERRUPTING = 5; // 中断中
static final int INTERRUPTED  = 6; // 已中断
// 结果存储设计
private Object outcome; // 非volatile,依赖状态可见性保证

image.gif

2. CompletableFuture 回调链实现

// 回调节点结构(简化版)
static final class UniCompletion<T,V> extends Completion {
    Executor executor;         // 执行线程池
    CompletableFuture<V> dep;   // 依赖的前序Future
    BiFunction<? super T,? super Throwable,? extends V> fn; // 回调函数
    void tryFire(int mode) {    // 触发回调执行
        if (dep != null && 
            compareAndSetState(0, 1)) { // CAS保证线程安全
            fn.apply(src, ex);  // 实际执行用户回调
        }
    }
}

image.gif


七、生产环境最佳实践

1. 异常处理模板

CompletableFuture.supplyAsync(() -> {
        // 业务逻辑
        return doSomething();
    })
    .exceptionally(ex -> {      // 捕获所有异常
        log.error("任务失败", ex);
        return defaultValue;    // 提供降级值
    })
    .thenAccept(result -> {     // 只处理成功情况
        updateUI(result); 
    });

image.gif

2. 资源清理策略

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 使用try-with-resources确保资源释放
        try (Connection conn = getConnection()) {
            process(conn);
        }
    }, executor);
    
    future.whenComplete((r, ex) -> {
        if (ex != null) {
            cleanupTempFiles(); // 失败时清理临时文件
        }
    });
} finally {
    executor.shutdown(); // 确保线程池关闭
}

image.gif

3. 性能监控指标

// 监控Future完成时长
Timer.Sample sample = Timer.start();
future.whenComplete((r, ex) -> {
    sample.stop(registry.timer("async.task.time"));
});
// 监控队列积压
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
metrics.gauge("task.queue.size", pool.getQueue()::size);

image.gif


八、与其他模式的协作

1. 结合发布-订阅模式

EventBus bus = new EventBus();
CompletableFuture.supplyAsync(() -> fetchData())
    .thenAccept(data -> {
        bus.post(new DataReadyEvent(data)); // 异步事件通知
    });
// 订阅方处理
@Subscribe
void handleDataReady(DataReadyEvent event) {
    // 处理已完成的数据
}

image.gif

2. 与反应式编程整合

// CompletableFuture -> Mono
Mono.fromFuture(() -> {
    return CompletableFuture.supplyAsync(() -> {
        return reactiveDao.query();
    });
}).subscribeOn(Schedulers.boundedElastic())
  .subscribe(System.out::println);
// Mono -> CompletableFuture
reactorMono.toFuture().thenApply(...);

image.gif


九、各语言实现对比

语言

核心实现类

特色功能

典型使用场景

Java

CompletableFuture

链式组合、CompletionStage

服务端异步编排

C#

Task

async/await语法糖

UI线程非阻塞调用

JavaScript

Promise

then/catch链式调用

前端API请求

Python

asyncio.Future

协程集成

爬虫/高并发IO

Go

chan

通道原生支持

高并发微服务


十、常见陷阱与解决方案

1. 回调地狱问题

反模式

future.thenApply(r1 -> {
    future2.thenApply(r2 -> {
        future3.thenApply(r3 -> {  // 嵌套层次过深
            return r1 + r2 + r3;
        });
    });
});

image.gif

解决方案

// 使用组合式编程
CompletableFuture.allOf(future1, future2, future3)
    .thenApply(v -> {
        return future1.join() + 
               future2.join() + 
               future3.join();
    });

image.gif

2. 线程泄漏场景

问题代码

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> {
    while (true) {  // 无限循环任务
        process();
    }
}, executor);  // 线程永远无法回收

image.gif

正确做法

// 使用守护线程或超时控制
ExecutorService executor = Executors.newFixedThreadPool(5, r -> {
    Thread t = new Thread(r);
    t.setDaemon(true);  // 设置为守护线程
    return t;
});

image.gif

3. 上下文丢失问题

问题现象

SecurityContext ctx = getContext();
CompletableFuture.runAsync(() -> {
    // 此处无法获取原始上下文
    doPrivilegedAction(); 
}, executor);

image.gif

解决方案

// 使用ContextPropagator
ExecutorService wrappedExecutor = ContextPropagator.wrap(executor);
CompletableFuture.runAsync(() -> {
    // 可以获取原始上下文
    doPrivilegedAction();
}, wrappedExecutor);

image.gif

目录
相关文章
|
30天前
|
设计模式 消息中间件 监控
并发设计模式实战系列(5):生产者/消费者
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第五章,废话不多说直接开始~
77 1
|
30天前
|
设计模式 负载均衡 监控
并发设计模式实战系列(2):领导者/追随者模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第二章领导者/追随者(Leader/Followers)模式,废话不多说直接开始~
51 0
|
30天前
|
设计模式 监控 Java
并发设计模式实战系列(1):半同步/半异步模式
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第一章半同步/半异步(Half-Sync/Half-Async)模式,废话不多说直接开始~
37 0
|
30天前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
145 0
|
30天前
|
设计模式 存储 安全
并发设计模式实战系列(7):Thread Local Storage (TLS)
🌟 大家好,我是摘星! 🌟今天为大家带来的是并发设计模式实战系列,第七章Thread Local Storage (TLS),废话不多说直接开始~
66 0
|
30天前
|
设计模式 消息中间件 监控
并发设计模式实战系列(3):工作队列
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第三章,废话不多说直接开始~
33 0
|
30天前
|
设计模式 监控 Java
并发设计模式实战系列(6):读写锁
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发设计模式实战系列,第六章,废话不多说直接开始~
32 0
|
1月前
|
设计模式 Java 数据库连接
【设计模式】【创建型模式】工厂方法模式(Factory Methods)
一、入门 什么是工厂方法模式? 工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,但由子类决定实例化哪个类。工厂方法模式使类的实例化延迟
79 16
|
30天前
|
设计模式 安全 Java
并发设计模式实战系列(12):不变模式(Immutable Object)
🌟 大家好,我是摘星!🌟今天为大家带来的是并发设计模式实战系列,第十二章,废话不多说直接开始~
32 0
|
20天前
|
设计模式 算法 Java
设计模式觉醒系列(04)策略模式|简单工厂模式的升级版
本文介绍了简单工厂模式与策略模式的概念及其融合实践。简单工厂模式用于对象创建,通过隐藏实现细节简化代码;策略模式关注行为封装与切换,支持动态替换算法,增强灵活性。两者结合形成“策略工厂”,既简化对象创建又保持低耦合。文章通过支付案例演示了模式的应用,并强调实际开发中应根据需求选择合适的设计模式,避免生搬硬套。最后推荐了JVM调优、并发编程等技术专题,助力开发者提升技能。

热门文章

最新文章