第五部分:消息队列 —— 异步削峰的解耦神器
消息队列(MQ)是分布式系统不可或缺的中间件,它通过异步通信实现应用解耦、流量削峰和最终一致性。
5.1 典型使用场景
异步处理:用户注册成功后发送邮件和短信,可放入队列,主流程快速返回。
应用解耦:订单系统发布“订单创建”事件,库存、积分等系统订阅处理,互不影响。
流量削峰:秒杀系统中,先将请求入队,后端 worker 按能力处理,避免 DB 被瞬时流量打死。
日志收集:各服务将日志发送到 MQ,再统一由 logstash 消费写入 ES。
5.2 主流 MQ 对比
5.3 可靠消息传递的关键概念
生产端确认:publisher confirm (RabbitMQ) / acks (Kafka) 确保消息成功到达 broker。
消费端确认:处理完消息后手动 ack,确保不丢失;若未 ack,消息可重新消费。
消息持久化:写入磁盘,broker 重启不丢失。
死信队列(DLQ):处理无法被正常消费的消息,便于排查和重试。
5.4 使用 RocketMQ 实现削峰(Java 示例)
生产者:秒杀请求入队
@PostMapping("/seckill")
public String seckill(Long userId, Long productId) {
// 简单校验库存(可用 Redis 预减)
String orderId = UUID.randomUUID().toString();
// 发送延迟消息(例如 10 秒后处理支付超时)
Message msg = new Message("SEC_KILL_TOPIC", "order", (userId+":"+productId).getBytes());
// 同步发送或异步
SendResult result = producer.send(msg);
return "排队中";
}
消费者:处理订单创建
@RocketMQMessageListener(topic = "SEC_KILL_TOPIC", consumerGroup = "order_consumer")
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 校验库存、创建订单、扣减数据库库存
// 注意幂等性(防止重复消费)
}
}
5.5 消息幂等性
由于 MQ 可能重发消息,消费者必须实现幂等(同一个消息被消费多次结果一样)。实现方式:
数据库唯一键约束:订单号作为唯一索引,重复插入会失败。
Redis 记录已处理的消息 ID:SETNX msgId 1 成功则处理,失败则跳过。
业务状态机:处理前检查状态(如订单已经是“已处理”状态则不再处理)。
第六部分:分布式事务 —— 跨数据的一致性难题
当业务涉及多个独立的数据源(如数据库、Redis、MQ)时,传统的数据库本地事务无法保证全局一致性。分布式事务方案在性能与一致性之间做出取舍。
6.1 两阶段提交(2PC)与三阶段提交(3PC)
2PC:协调者询问所有参与者是否准备好(Prepare),若全部同意则提交(Commit),否则回滚。缺点是同步阻塞、单点故障、数据不一致风险(Prepare 后协调者崩溃)。
3PC:引入超时机制和预提交阶段,减少了阻塞范围,但仍较复杂,实际应用较少。
典型实现:XA 协议(MySQL、Oracle 支持),性能较差,不适合高并发。
6.2 TCC(Try-Confirm-Cancel)
TCC 是一种补偿型事务,将业务操作拆分为三个阶段:
Try:预留资源(如冻结库存、预减余额)。
Confirm:确认执行(实际扣减、完成业务)。
Cancel:取消(释放预留资源)。
优点:性能较高,由业务层控制粒度。缺点:侵入性强,需要编写三个接口,且要处理幂等和悬挂问题。
示例:转账服务(A 转给 B 100 元)
// Try 阶段:冻结 A 的 100 元,增加 B 的预收资金
void tryTransfer(String from, String to, int amount);
// Confirm:实际扣减 A 的冻结资金,将 B 的预收转为可用余额
void confirmTransfer(String from, String to, int amount);
// Cancel:解冻 A 的资金,回退 B 的预收
void cancelTransfer(String from, String to, int amount);
实现时需要保证 Confirm/Cancel 的幂等性。
6.3 本地消息表 + MQ(最终一致性)
这是最常用的最终一致性方案。以订单创建和扣减库存为例:
订单服务在本地事务中:插入订单记录,同时插入一条消息记录(状态为“待发送”)。
异步任务轮询消息表,将消息发送到 MQ。
库存服务消费 MQ 消息,扣减库存。
库存处理成功后,通过回调接口通知订单服务更新消息状态为“已处理”。
若库存服务失败,可重试或转入人工处理。
优点:不依赖分布式事务,性能好。缺点:需要维护消息表,且至少需要保证 MQ 至少一次投递。
6.4 Seata 框架
Seata 是阿里开源的分布式事务解决方案,提供了 AT(自动补偿)、TCC、Saga 等模式。AT 模式基于数据源代理,自动生成回滚 SQL,无业务侵入。
# Spring Boot 中使用 Seata AT 模式
@GlobalTransactional
public void purchase(Long userId, Long productId, int count) {
orderService.createOrder(userId, productId, count); // 本地事务
stockService.deductStock(productId, count); // 远程调用
accountService.debitBalance(userId, count * price); // 远程调用
}
Seata 记录 Undo Log,若任一步骤失败,自动反向补偿。
来源:
https://hllft.cn/