编辑
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第九章消息传递(Message Passing),废话不多说直接开始~
目录
一、核心原理深度拆解
1. 消息传递架构
┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ Producer │───> │ Message Queue │───> │ Consumer │ │ (并发发送消息) │<─── │ (线程安全缓冲) │<─── │ (并发处理消息) │ └───────────────┘ └───────────────┘ └───────────────┘
- 生产者-消费者解耦:通过消息队列实现松耦合
- 线程安全通信:消息队列作为共享数据的唯一通道
- 流量控制:队列容量限制防止系统过载
2. 并发控制关键
- 无共享状态:各线程通过消息通信而非共享内存
- 异步处理:生产者不等待消费者处理完成
- 背压机制:队列满时生产者阻塞或采取其他策略
二、生活化类比:邮局系统
系统组件 |
现实类比 |
核心行为 |
Producer |
寄信人 |
写好信投入邮箱 |
Message Queue |
邮局分拣中心 |
暂存和分类信件 |
Consumer |
邮递员 |
按区域派送信件 |
- 突发流量处理:节假日大量信件 → 邮局暂存 → 邮递员按能力派送
- 失败处理:无法投递的信件退回或进入死信队列
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.concurrent.*; import java.util.function.Consumer; public class MessagePassingSystem<T> { // 消息队列(设置容量防止OOM) private final BlockingQueue<Message<T>> messageQueue; // 消费者线程池 private final ExecutorService consumerPool; // 消息封装 static class Message<T> { final T content; final Consumer<T> onSuccess; final Consumer<Exception> onError; Message(T content, Consumer<T> onSuccess, Consumer<Exception> onError) { this.content = content; this.onSuccess = onSuccess; this.onError = onError; } } public MessagePassingSystem(int queueSize, int consumerThreads) { this.messageQueue = new LinkedBlockingQueue<>(queueSize); this.consumerPool = Executors.newFixedThreadPool(consumerThreads, r -> { Thread t = new Thread(r); t.setDaemon(true); return t; }); // 启动消费者 startConsumers(); } // 生产者API public void send(T message, Consumer<T> onSuccess, Consumer<Exception> onError) throws InterruptedException { Message<T> msg = new Message<>(message, onSuccess, onError); messageQueue.put(msg); // 阻塞直到队列有空位 } // 消费者处理 private void startConsumers() { for (int i = 0; i < consumerPool.getCorePoolSize(); i++) { consumerPool.execute(() -> { while (!Thread.currentThread().isInterrupted()) { try { Message<T> message = messageQueue.take(); processMessage(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); } } private void processMessage(Message<T> message) { try { // 模拟业务处理 System.out.println("处理消息: " + message.content); Thread.sleep(100); // 模拟处理耗时 // 处理成功回调 if (message.onSuccess != null) { message.onSuccess.accept(message.content); } } catch (Exception e) { // 处理失败回调 if (message.onError != null) { message.onError.accept(e); } } } public void shutdown() { consumerPool.shutdown(); } public static void main(String[] args) throws Exception { // 创建消息系统:队列容量100,4个消费者线程 MessagePassingSystem<String> mps = new MessagePassingSystem<>(100, 4); // 模拟生产者 ExecutorService producers = Executors.newFixedThreadPool(8); for (int i = 0; i < 100; i++) { final int msgId = i; producers.execute(() -> { try { mps.send("消息-" + msgId, result -> System.out.println("处理成功: " + result), error -> System.err.println("处理失败: " + msgId + ", " + error) ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } producers.shutdown(); producers.awaitTermination(1, TimeUnit.MINUTES); mps.shutdown(); } }
2. 关键配置说明
// 队列选择策略 BlockingQueue<Message<T>> queue = new LinkedBlockingQueue<>(100); // 有界队列 // 或 BlockingQueue<Message<T>> queue = new ArrayBlockingQueue<>(100); // 固定大小 // 消费者线程池配置 ExecutorService consumerPool = new ThreadPoolExecutor( 4, // 核心线程数 8, // 最大线程数 60, TimeUnit.SECONDS, new SynchronousQueue<>(), // 直接传递 new ThreadPoolExecutor.CallerRunsPolicy() // 饱和策略 );
四、横向对比表格
1. 并发通信模式对比
模式 |
数据共享方式 |
线程安全保证 |
适用场景 |
共享内存 |
直接内存访问 |
需显式同步 |
高性能计算 |
消息传递 |
通过消息队列 |
队列内部保证 |
分布式/并发系统 |
Actor模型 |
通过消息邮箱 |
每个Actor单线程 |
高并发系统 |
数据流 |
通过流管道 |
管道内部保证 |
流处理系统 |
2. 消息队列实现对比
实现方式 |
特点 |
适用场景 |
BlockingQueue |
JVM内内存队列 |
单机多线程通信 |
Kafka/RabbitMQ |
分布式持久化队列 |
分布式系统 |
Disruptor |
高性能无锁队列 |
低延迟高吞吐系统 |
ZeroMQ |
网络消息库 |
跨进程通信 |
五、高级优化技巧
1. 批量消息处理
// 批量消费消息提升吞吐量 List<Message<T>> batch = new ArrayList<>(100); messageQueue.drainTo(batch, 100); // 批量取出 if (!batch.isEmpty()) { consumerPool.execute(() -> processBatch(batch)); }
2. 优先级消息处理
// 使用优先级队列 BlockingQueue<Message<T>> queue = new PriorityBlockingQueue<>(100, (m1, m2) -> m1.priority - m2.priority );
3. 死信队列处理
// 失败消息转入死信队列 private final BlockingQueue<Message<T>> deadLetterQueue = new LinkedBlockingQueue<>(); private void processMessage(Message<T> message) { try { // ...正常处理 } catch (Exception e) { deadLetterQueue.put(message); // 转入死信队列 } }
4. 监控指标
// 监控队列积压 int backlog = messageQueue.size(); // 监控处理延迟 long enqueueTime = System.currentTimeMillis(); // 在消息处理时记录延迟 long latency = System.currentTimeMillis() - message.enqueueTime;
六、分布式消息传递系统设计
1. 跨节点通信架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Node 1 │───>│ Message │───>│ Node 2 │ │ (Producer) │<───│ Broker │<───│ (Consumer) │ └─────────────┘ └─────────────┘ └─────────────┘
- 消息代理中间件:Kafka/RabbitMQ/RocketMQ
- 序列化协议:Protobuf/Avro/JSON
- 网络传输:TCP长连接/HTTP2/gRPC
2. 关键设计考量
// 分布式消息发送示例(伪代码) public class DistributedSender { private final MessageQueueClient client; public void send(String topic, byte[] payload) { // 消息属性设置 Message msg = new Message(topic, payload, System.currentTimeMillis(), DeliveryGuarantee.AT_LEAST_ONCE ); // 异步发送+回调 client.sendAsync(msg, new Callback() { @Override public void onComplete(Result result) { if(!result.isSuccess()) { // 重试或记录死信 retryOrDeadLetter(msg); } } }); } }
七、消息模式高级变体
1. 发布/订阅模式实现
// 基于Topic的消息路由 public class PubSubSystem { private final Map<String, List<Consumer>> topicSubscribers = new ConcurrentHashMap<>(); public void subscribe(String topic, Consumer consumer) { topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()) .add(consumer); } public void publish(String topic, Message message) { List<Consumer> consumers = topicSubscribers.get(topic); if(consumers != null) { consumers.forEach(c -> c.accept(message)); } } }
2. 请求-响应模式实现
// 带关联ID的请求响应处理 public class RequestReplySystem { private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<>(); private final ConcurrentMap<String, CompletableFuture<Response>> pendingRequests = new ConcurrentHashMap<>(); // 请求方 public CompletableFuture<Response> request(Request req) { CompletableFuture<Response> future = new CompletableFuture<>(); pendingRequests.put(req.getCorrelationId(), future); requestQueue.offer(req); return future; } // 响应处理线程 private void processResponse(Response resp) { CompletableFuture<Response> future = pendingRequests.remove(resp.getCorrelationId()); if(future != null) { future.complete(resp); } } }
八、性能优化深度策略
1. 零拷贝传输优化
// 使用ByteBuffer减少内存拷贝 public class ZeroCopyMessage { private final ByteBuffer payload; public void send(SocketChannel channel) throws IOException { while(payload.hasRemaining()) { channel.write(payload); } } }
2. 批处理与压缩
// 消息批量压缩处理 public class BatchCompressor { public byte[] compressBatch(List<Message> messages) { try(ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(baos)) { for(Message msg : messages) { gzip.write(msg.serialize()); } gzip.finish(); return baos.toByteArray(); } } }
九、容错与可靠性保障
1. 消息持久化方案
// 基于WAL的持久化存储 public class MessageJournal { private final RandomAccessFile journalFile; private final AtomicLong writeOffset = new AtomicLong(0); public void append(Message msg) throws IOException { byte[] data = msg.serialize(); synchronized(journalFile) { journalFile.seek(writeOffset.get()); journalFile.writeInt(data.length); journalFile.write(data); journalFile.getFD().sync(); writeOffset.addAndGet(4 + data.length); } } }
2. 事务消息处理
// 两阶段提交实现 public class TransactionalProcessor { public void processWithTransaction(Message msg) { try { // 阶段1:预备 boolean prepared = prepareResources(msg); // 阶段2:提交/回滚 if(prepared) { commitTransaction(msg); sendAck(msg.getId()); } else { rollbackTransaction(msg); sendNack(msg.getId()); } } catch(Exception e) { // 补偿处理 compensateTransaction(msg); } } }
十、现代消息模式演进
1. 响应式消息流
// 基于Reactive Streams的实现 public class ReactiveMessageSystem { private final Flux<Message> messageStream; public ReactiveMessageSystem(MessageSource source) { this.messageStream = Flux.fromIterable(source) .publishOn(Schedulers.parallel()) .filter(msg -> isValid(msg)) .map(this::transform); } public void subscribe(Consumer<Message> subscriber) { messageStream.subscribe(subscriber); } }
2. 事件溯源模式
// 基于事件日志的状态重建 public class EventSourcedRepository { private final EventStore eventStore; public Entity get(String id) { List<Event> events = eventStore.getEvents(id); return recreateEntity(events); } private Entity recreateEntity(List<Event> events) { Entity entity = new Entity(); events.forEach(e -> entity.apply(e)); return entity; } }
十一、性能监控指标体系
1. 关键监控指标
指标类别 |
具体指标 |
采集方式 |
吞吐量 |
消息发送/消费速率(msg/s) |
计数器+时间窗口 |
延迟 |
端到端处理延迟(ms) |
时间戳差值统计 |
可靠性 |
消息投递成功率(%) |
成功/失败计数器 |
资源使用 |
内存/CPU/网络占用 |
系统监控接口 |
2. 健康检查实现
public class HealthChecker { private final MessageQueue queue; private final ThreadPoolExecutor executor; public HealthCheckResult check() { return new HealthCheckResult( queue.size() < queue.capacity() * 0.9, executor.getActiveCount() < executor.getMaximumPoolSize(), System.currentTimeMillis() - lastMessageTime < 5000 ); } }
十二、典型应用场景案例
1. 电商订单处理系统
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 订单创建服务 │───>│ 订单消息队列 │───>│ 订单处理服务 │ └─────────────┘ └─────────────┘ └─────────────┘ ↓ ┌─────────────┐ │ 支付服务 │ └─────────────┘
2. 物联网数据处理
// 设备消息处理流水线 public class IoTMessagePipeline { public void buildPipeline() { // 1. 设备原始数据接收 MessageSource source = new MqttSource(); // 2. 构建处理流水线 Pipeline pipeline = new PipelineBuilder() .addStage(new DataValidation()) .addStage(new DataNormalization()) .addStage(new AnomalyDetection()) .addStage(new StorageWriter()) .build(); // 3. 启动处理 source.subscribe(pipeline::process); } }