一、问题背景
微服务架构下,跨服务的数据同步是经典难题。在Taocarts系统中,订单创建后需要同步到库存服务(扣减库存)、物流服务(生成运单)、通知服务(发送邮件)等多个下游系统。
如果采用强一致性的分布式事务(如2PC),性能和复杂性都会急剧上升。Taocarts选择的是“本地消息表+消息队列”方案,实现最终一致性。
二、本地消息表设计
sql
CREATE TABLE local_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL, topic VARCHAR(64) NOT NULL, payload TEXT NOT NULL, status TINYINT NOT NULL DEFAULT 0 COMMENT '0-待发送, 1-已发送, 2-发送失败', retry_count INT NOT NULL DEFAULT 0, max_retries INT NOT NULL DEFAULT 3, next_retry_time DATETIME DEFAULT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_message_id (message_id), KEY idx_status_next_retry (status, next_retry_time));
三、订单创建时写入本地消息
在订单创建的事务中同时写入本地消息记录:
java
@Service@Transactionalpublic class OrderService { public void createOrder(OrderDTO orderDTO) { // 1. 保存订单 Order order = new Order(); order.setOrderNo(generateOrderNo()); orderMapper.insert(order); // 2. 保存本地消息 LocalMessage msg = new LocalMessage(); msg.setMessageId(UUID.randomUUID().toString()); msg.setTopic("ORDER_CREATED"); msg.setPayload(JSON.toJSONString(order)); msg.setStatus(0); msg.setNextRetryTime(new Date()); messageMapper.insert(msg); }}
四、后台任务扫描并发送
java
@Componentpublic class MessageSendScheduler { @Scheduled(fixedDelay = 5000) public void sendPendingMessages() { List messages = messageMapper.selectPendingMessages(100); for (LocalMessage msg : messages) { try { SendResult result = rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload()); if (result.getSendStatus() == SendStatus.SEND_OK) { msg.setStatus(1); messageMapper.updateById(msg); } } catch (Exception e) { msg.setRetryCount(msg.getRetryCount() + 1); if (msg.getRetryCount() >= msg.getMaxRetries()) { msg.setStatus(2); alertService.send("消息进入死信队列: " + msg.getMessageId()); } else { long delayMinutes = 1L << msg.getRetryCount(); msg.setNextRetryTime(new Date(System.currentTimeMillis() + delayMinutes 60 1000)); } messageMapper.updateById(msg); } } }}
五、消费端幂等处理
java
@Component@RocketMQMessageListener(topic = "ORDER_CREATED", consumerGroup = "inventory-consumer")public class InventoryConsumer implements RocketMQListener { @Override public void onMessage(String message) { String messageId = JSON.parseObject(message).getString("messageId"); String key = "processed:" + messageId; Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(24)); if (Boolean.FALSE.equals(success)) { log.info("消息已处理过,跳过: {}", messageId); return; } try { inventoryService.deductStock(...); } catch (Exception e) { redisTemplate.delete(key); throw e; } }}
六、总结
Taocarts通过本地消息表+消息队列成功解耦了订单创建与下游服务的依赖,消息送达率达到99.99%。核心经验是:分布式系统要接受“最终一致性”,而不是追求“强一致性”。