编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第十五章Future/Promise,废话不多说直接开始~
目录
一、核心原理深度拆解
1. 异步计算双阶段模型
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Task │───>│ Future │───>│ Callback │ │ Submission │<───│ (Promise) │<───│ Execution │ └─────────────┘ └─────────────┘ └─────────────┘
- 提交阶段:主线程提交任务后立即返回Future占位符
- 计算阶段:工作线程异步执行计算,通过Promise设置结果
- 回调阶段:结果就绪后触发回调(观察者模式)
2. 状态机流转
public interface Future<V> { boolean isDone(); // 完成状态(成功/失败/取消) V get() throws...; // 阻塞获取结果 void addCallback(...); // 回调注册 }
二、生活化类比:快递柜取件
系统组件 |
现实类比 |
核心行为 |
Future |
快递柜取件码 |
凭码查询包裹是否到达 |
Promise |
快递员存件操作 |
实际将包裹放入柜中并更新状态 |
Callback |
短信通知服务 |
包裹入柜后自动发送取件提醒 |
- 异步流程:下单→获得取件码(Future)→快递员送货(异步计算)→短信通知(Callback)
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.concurrent.*; import java.util.function.Consumer; public class FuturePromiseDemo { // 1. 自定义Promise实现 static class MyPromise<V> implements Future<V>, Runnable { private volatile V result; private volatile Throwable error; private volatile boolean isDone; private final CountDownLatch latch = new CountDownLatch(1); private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>(); // 提交任务时执行的方法 @Override public void run() { try { // 模拟耗时计算 Thread.sleep(1000); setResult((V) "计算结果"); // 实际业务逻辑替换此处 } catch (Exception e) { setError(e); } } // Promise核心方法:设置结果 public void setResult(V result) { this.result = result; this.isDone = true; latch.countDown(); notifyCallbacks(); } // Promise核心方法:设置异常 public void setError(Throwable error) { this.error = error; this.isDone = true; latch.countDown(); } private void notifyCallbacks() { callbacks.forEach(cb -> cb.accept(result)); } // Future实现方法 @Override public V get() throws InterruptedException, ExecutionException { latch.await(); if (error != null) throw new ExecutionException(error); return result; } @Override public boolean isDone() { return isDone; } // 注册回调(非JUC标准方法) public void addCallback(Consumer<V> callback) { if (isDone) { callback.accept(result); } else { callbacks.add(callback); } } } // 2. 使用示例 public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); // 创建Promise并提交任务 MyPromise<String> promise = new MyPromise<>(); executor.submit(promise); // 注册回调 promise.addCallback(result -> System.out.println("[回调] 异步结果: " + result)); // 同步阻塞获取 System.out.println("[主线程] 立即返回,继续其他工作..."); System.out.println("最终结果: " + promise.get()); executor.shutdown(); } }
2. 关键机制说明
// 1. 状态同步控制 private volatile boolean isDone; // 保证可见性 private final CountDownLatch latch; // 实现阻塞等待 // 2. 线程安全回调列表 private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>(); // 3. 异常处理流程 public void setError(Throwable error) { this.error = error; this.isDone = true; latch.countDown(); // 唤醒所有等待线程 }
四、横向对比表格
1. 异步模式对比
模式 |
核心特点 |
适用场景 |
Java实现类 |
Future |
阻塞式获取结果 |
简单异步任务 |
FutureTask |
CompletableFuture |
链式调用+组合操作 |
复杂异步流水线 |
CompletableFuture |
Promise |
可写的结果容器 |
跨线程结果传递 |
需自行实现 |
Callback |
事件驱动无阻塞 |
高并发IO |
Netty的ChannelFuture |
2. 回调注册方式对比
方法 |
触发时机 |
线程安全性 |
链式支持 |
addCallback |
结果就绪后立即执行 |
需自行保证 |
不支持 |
thenApply |
前序阶段完成后触发 |
内置线程池控制 |
支持 |
whenComplete |
无论成功失败都执行 |
可能在不同线程执行 |
支持 |
五、高级应用技巧
1. 组合多个异步任务
CompletableFuture<String> query1 = queryDatabase("sql1"); CompletableFuture<String> query2 = queryDatabase("sql2"); // 并行执行后合并结果 CompletableFuture<String> merged = query1.thenCombineAsync(query2, (r1, r2) -> r1 + "|" + r2, ForkJoinPool.commonPool());
2. 超时控制
Future<String> future = executor.submit(task); try { String result = future.get(2, TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true); // 中断任务执行 }
3. 回调线程控制
promise.addCallback(result -> { // 指定回调执行线程池 ForkJoinPool.commonPool().execute(() -> processResult(result)); });
通过这种 原理+实现+对比 的立体解析,可以掌握:
- Future/Promise的双阶段异步本质
- 如何实现生产级的Promise容器
- 不同异步模式的适用场景选择
- 复杂场景下的组合使用技巧
六、源码级实现剖析(接五)
1. JDK FutureTask 核心逻辑
// 状态机定义(OpenJDK 17) private volatile int state; static final int NEW = 0; // 初始化状态 static final int COMPLETING = 1; // 临时状态 static final int NORMAL = 2; // 正常完成 static final int EXCEPTIONAL = 3; // 异常完成 static final int CANCELLED = 4; // 已取消 static final int INTERRUPTING = 5; // 中断中 static final int INTERRUPTED = 6; // 已中断 // 结果存储设计 private Object outcome; // 非volatile,依赖状态可见性保证
2. CompletableFuture 回调链实现
// 回调节点结构(简化版) static final class UniCompletion<T,V> extends Completion { Executor executor; // 执行线程池 CompletableFuture<V> dep; // 依赖的前序Future BiFunction<? super T,? super Throwable,? extends V> fn; // 回调函数 void tryFire(int mode) { // 触发回调执行 if (dep != null && compareAndSetState(0, 1)) { // CAS保证线程安全 fn.apply(src, ex); // 实际执行用户回调 } } }
七、生产环境最佳实践
1. 异常处理模板
CompletableFuture.supplyAsync(() -> { // 业务逻辑 return doSomething(); }) .exceptionally(ex -> { // 捕获所有异常 log.error("任务失败", ex); return defaultValue; // 提供降级值 }) .thenAccept(result -> { // 只处理成功情况 updateUI(result); });
2. 资源清理策略
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); try { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 使用try-with-resources确保资源释放 try (Connection conn = getConnection()) { process(conn); } }, executor); future.whenComplete((r, ex) -> { if (ex != null) { cleanupTempFiles(); // 失败时清理临时文件 } }); } finally { executor.shutdown(); // 确保线程池关闭 }
3. 性能监控指标
// 监控Future完成时长 Timer.Sample sample = Timer.start(); future.whenComplete((r, ex) -> { sample.stop(registry.timer("async.task.time")); }); // 监控队列积压 ThreadPoolExecutor pool = (ThreadPoolExecutor) executor; metrics.gauge("task.queue.size", pool.getQueue()::size);
八、与其他模式的协作
1. 结合发布-订阅模式
EventBus bus = new EventBus(); CompletableFuture.supplyAsync(() -> fetchData()) .thenAccept(data -> { bus.post(new DataReadyEvent(data)); // 异步事件通知 }); // 订阅方处理 @Subscribe void handleDataReady(DataReadyEvent event) { // 处理已完成的数据 }
2. 与反应式编程整合
// CompletableFuture -> Mono Mono.fromFuture(() -> { return CompletableFuture.supplyAsync(() -> { return reactiveDao.query(); }); }).subscribeOn(Schedulers.boundedElastic()) .subscribe(System.out::println); // Mono -> CompletableFuture reactorMono.toFuture().thenApply(...);
九、各语言实现对比
语言 |
核心实现类 |
特色功能 |
典型使用场景 |
Java |
CompletableFuture |
链式组合、CompletionStage |
服务端异步编排 |
C# |
Task |
async/await语法糖 |
UI线程非阻塞调用 |
JavaScript |
Promise |
then/catch链式调用 |
前端API请求 |
Python |
asyncio.Future |
协程集成 |
爬虫/高并发IO |
Go |
chan |
通道原生支持 |
高并发微服务 |
十、常见陷阱与解决方案
1. 回调地狱问题
反模式:
future.thenApply(r1 -> { future2.thenApply(r2 -> { future3.thenApply(r3 -> { // 嵌套层次过深 return r1 + r2 + r3; }); }); });
解决方案:
// 使用组合式编程 CompletableFuture.allOf(future1, future2, future3) .thenApply(v -> { return future1.join() + future2.join() + future3.join(); });
2. 线程泄漏场景
问题代码:
ExecutorService executor = Executors.newFixedThreadPool(5); CompletableFuture.runAsync(() -> { while (true) { // 无限循环任务 process(); } }, executor); // 线程永远无法回收
正确做法:
// 使用守护线程或超时控制 ExecutorService executor = Executors.newFixedThreadPool(5, r -> { Thread t = new Thread(r); t.setDaemon(true); // 设置为守护线程 return t; });
3. 上下文丢失问题
问题现象:
SecurityContext ctx = getContext(); CompletableFuture.runAsync(() -> { // 此处无法获取原始上下文 doPrivilegedAction(); }, executor);
解决方案:
// 使用ContextPropagator ExecutorService wrappedExecutor = ContextPropagator.wrap(executor); CompletableFuture.runAsync(() -> { // 可以获取原始上下文 doPrivilegedAction(); }, wrappedExecutor);