问题
我们在下单的时候,经常会使用mq发送一个异步消息,然后扣减库存,那么在这个过程中可能会发生 异步消息发送失败,扣减库存执行失败,导致下单失败无法正确回补库存,针对这种不一致性问题,可以使用事务型消息。
事务型消息生产者
@Component public class MqProducer { private DefaultMQProducer producer; // 默认生产者 private TransactionMQProducer transactionMQProducer; // 事务型生产者 @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private OrderService orderService; @Autowired private StockLogDOMapper stockLogDOMapper; @PostConstruct public void init() throws MQClientException { //做mq producer的初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); transactionMQProducer = new TransactionMQProducer("transaction_producer_group"); transactionMQProducer.setNamesrvAddr(nameAddr); transactionMQProducer.start(); transactionMQProducer.setTransactionListener(new TransactionListener() { // 第2步执行 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //真正要做的事 创建订单 //arg的值来自 transactionAsyncReduceStock方法中的argsMap Integer itemId = (Integer) ((Map)arg).get("itemId"); Integer promoId = (Integer) ((Map)arg).get("promoId"); Integer userId = (Integer) ((Map)arg).get("userId"); Integer amount = (Integer) ((Map)arg).get("amount"); String stockLogId = (String) ((Map)arg).get("stockLogId"); try { orderService.createOrder(userId,itemId,promoId,amount,stockLogId); } catch (Exception e) { e.printStackTrace(); //设置对应的stockLog为回滚状态 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); stockLogDO.setStatus(3); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } // 第2步执行出错后,没有明确返回COMMIT_MESSAGE或ROLLBACK_MESSAGE的时候 执行 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWN String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); String stockLogId = (String) map.get("stockLogId"); // 查询操作流水,看这笔交易是否成功 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null){ return LocalTransactionState.UNKNOW; } if(stockLogDO.getStatus().intValue() == 2){ return LocalTransactionState.COMMIT_MESSAGE; }else if(stockLogDO.getStatus().intValue() == 1){ return LocalTransactionState.UNKNOW; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); } //事务型同步库存扣减消息 第1步执行 public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount,String stockLogId){ Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); bodyMap.put("stockLogId",stockLogId); Map<String,Object> argsMap = new HashMap<>(); argsMap.put("itemId",itemId); argsMap.put("amount",amount); argsMap.put("userId",userId); argsMap.put("promoId",promoId); argsMap.put("stockLogId",stockLogId); Message message = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); TransactionSendResult sendResult = null; try { //这里的artsMap,会传到 executeLocalTransaction(Message msg, Object arg)中的arg sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } if(sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){ return false; }else if(sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){ return true; }else{ return false; } } }
事务型消息消费者
@Component public class MqConsumer { private DefaultMQPushConsumer consumer; @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private ItemStockDOMapper itemStockDOMapper; @PostConstruct public void init() throws MQClientException { consumer = new DefaultMQPushConsumer("stock_consumer_group"); consumer.setNamesrvAddr(nameAddr); consumer.subscribe(topicName,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //实现库存真正到数据库内扣减的逻辑 Message msg = msgs.get(0); String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); itemStockDOMapper.decreaseStock(itemId,amount); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
下单的Controller
@RequestMapping(value = "/createorder",method = {RequestMethod.POST},consumes={CONTENT_TYPE_FORMED}) @ResponseBody public CommonReturnType createOrder(@RequestParam(name="itemId")Integer itemId, @RequestParam(name="amount")Integer amount, @RequestParam(name="promoId",required = false)Integer promoId, @RequestParam(name="promoToken",required = false)String promoToken) throws BusinessException { // 限流 boolean tryAcquire = orderCreateRateLimiter.tryAcquire(); if(!tryAcquire){ throw new BusinessException(EmBusinessError.RATELIMIT); } String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //获取用户的登陆信息 UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //校验秒杀令牌是否正确 if(promoId != null){ String inRedisPromoToken = (String) redisTemplate.opsForValue().get("promo_token_"+promoId+"_userid_"+userModel.getId()+"_itemid_"+itemId); if(inRedisPromoToken == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } if(!org.apache.commons.lang3.StringUtils.equals(promoToken,inRedisPromoToken)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } } //同步调用线程池的submit方法 //拥塞窗口为20的等待队列,用来队列化泄洪 Future<Object> future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { //加入库存流水init状态,这是为了MqProducer类中checkLocalTransaction方法 查看扣减库存成功与否 String stockLogId = itemService.initStockLog(itemId,amount); //再去完成对应的下单事务型消息机制 if(!mqProducer.transactionAsyncReduceStock(userModel.getId(),itemId,promoId,amount,stockLogId)){ throw new BusinessException(EmBusinessError.UNKNOWN_ERROR,"下单失败"); } return null; } }); try { future.get(); } catch (InterruptedException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } catch (ExecutionException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } return CommonReturnType.create(null); }
订单实现类
@Override @Transactional public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount,String stockLogId) throws Exception { //1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确 //ItemModel itemModel = itemService.getItemById(itemId); ItemModel itemModel = itemService.getItemByIdInCache(itemId); if(itemModel == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在"); } // // UserModel userModel = userService.getUserByIdInCache(userId); // if(userModel == null){ // throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用户信息不存在"); // } if(amount <= 0 || amount > 99){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"数量信息不正确"); } //校验活动信息 // if(promoId != null){ // //(1)校验对应活动是否存在这个适用商品 // if(promoId.intValue() != itemModel.getPromoModel().getId()){ // throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息不正确"); // //(2)校验活动是否正在进行中 // }else if(itemModel.getPromoModel().getStatus().intValue() != 2) { // throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息还未开始"); // } // } //2.落单减库存 boolean result = itemService.decreaseStock(itemId,amount); if(!result){ throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH); } //3.订单入库 OrderModel orderModel = new OrderModel(); orderModel.setUserId(userId); orderModel.setItemId(itemId); orderModel.setAmount(amount); if(promoId != null){ orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice()); }else{ orderModel.setItemPrice(itemModel.getPrice()); } orderModel.setPromoId(promoId); orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount))); //生成交易流水号,订单号 orderModel.setId(generateOrderNo()); OrderDO orderDO = convertFromOrderModel(orderModel); orderDOMapper.insertSelective(orderDO); //加上商品的销量 itemService.increaseSales(itemId,amount); //设置库存流水状态为成功 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null){ throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } stockLogDO.setStatus(2); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); //4.返回前端 return orderModel; }