失败补偿机制
消息发送方
- 配置RocketMQ属性值
ocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 rocketmq.producer.group=orderProducerGroup mq.order.consumer.group.name=order_orderTopic_cancel_group mq.order.topic=orderTopic mq.order.tag.confirm=order_confirm mq.order.tag.cancel=order_cancel 复制代码
- 注入模板类和属性值信息
@Autowired private RocketMQTemplate rocketMQTemplate; @Value("${mq.order.topic}") private String topic; @Value("${mq.order.tag.cancel}") private String cancelTag; 复制代码
- 发送下单失败消息
@Override public Result confirmOrder(TradeOrder order) { //1.校验订单 //2.生成预订 try { //3.扣减库存 //4.扣减优惠券 //5.使用余额 //6.确认订单 } catch (Exception e) { //确认订单失败,发送消息 CancelOrderMQ cancelOrderMQ = new CancelOrderMQ(); cancelOrderMQ.setOrderId(order.getOrderId()); cancelOrderMQ.setCouponId(order.getCouponId()); cancelOrderMQ.setGoodsId(order.getGoodsId()); cancelOrderMQ.setGoodsNumber(order.getGoodsNumber()); cancelOrderMQ.setUserId(order.getUserId()); cancelOrderMQ.setUserMoney(order.getMoneyPaid()); try { sendMessage(topic, cancelTag, cancelOrderMQ.getOrderId().toString(), JSON.toJSONString(cancelOrderMQ)); } catch (Exception e1) { e1.printStackTrace(); CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL); } return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage()); } } 复制代码
private void sendMessage(String topic, String tags, String keys, String body) throws Exception { //判断Topic是否为空 if (StringUtils.isEmpty(topic)) { CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY); } //判断消息内容是否为空 if (StringUtils.isEmpty(body)) { CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY); } //消息体 Message message = new Message(topic, tags, keys, body.getBytes()); //发送消息 rocketMQTemplate.getProducer().send(message); } 复制代码
消费接收方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 mq.order.consumer.group.name=order_orderTopic_cancel_group mq.order.topic=orderTopic 复制代码
创建监听类,消费消息
@Slf4j @Component @RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING) public class CancelOrderConsumer implements RocketMQListener<MessageExt>{ @Override public void onMessage(MessageExt messageExt) { ... } } 复制代码
回退库存
网络异常,图片无法展示
|
消息消费者
@Slf4j @Component @RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING ) public class CancelMQListener implements RocketMQListener<MessageExt>{ @Value("${mq.order.consumer.group.name}") private String groupName; @Autowired private TradeGoodsMapper goodsMapper; @Autowired private TradeMqConsumerLogMapper mqConsumerLogMapper; @Autowired private TradeGoodsNumberLogMapper goodsNumberLogMapper; @Override public void onMessage(MessageExt messageExt) { String msgId=null; String tags=null; String keys=null; String body=null; try { //1. 解析消息内容 msgId = messageExt.getMsgId(); tags= messageExt.getTags(); keys= messageExt.getKeys(); body= new String(messageExt.getBody(),"UTF-8"); log.info("接受消息成功"); //2. 查询消息消费记录 TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey(); primaryKey.setMsgTag(tags); primaryKey.setMsgKey(keys); primaryKey.setGroupName(groupName); TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey); if(mqConsumerLog!=null){ //3. 判断如果消费过... //3.1 获得消息处理状态 Integer status = mqConsumerLog.getConsumerStatus(); //处理过...返回 if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){ log.info("消息:"+msgId+",已经处理过"); return; } //正在处理...返回 if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){ log.info("消息:"+msgId+",正在处理"); return; } //处理失败 if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){ //获得消息处理次数 Integer times = mqConsumerLog.getConsumerTimes(); if(times>3){ log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了"); return; } mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()); //使用数据库乐观锁更新 TradeMqConsumerLogExample example = new TradeMqConsumerLogExample(); TradeMqConsumerLogExample.Criteria criteria = example.createCriteria(); criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag()); criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey()); criteria.andGroupNameEqualTo(groupName); criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes()); int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example); if(r<=0){ //未修改成功,其他线程并发修改 log.info("并发修改,稍后处理"); } } }else{ //4. 判断如果没有消费过... mqConsumerLog = new TradeMqConsumerLog(); mqConsumerLog.setMsgTag(tags); mqConsumerLog.setMsgKey(keys); mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode()); mqConsumerLog.setMsgBody(body); mqConsumerLog.setMsgId(msgId); mqConsumerLog.setConsumerTimes(0); //将消息处理信息添加到数据库 mqConsumerLogMapper.insert(mqConsumerLog); } //5. 回退库存 MQEntity mqEntity = JSON.parseObject(body, MQEntity.class); Long goodsId = mqEntity.getGoodsId(); TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId); goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum()); goodsMapper.updateByPrimaryKey(goods); //记录库存操作日志 TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog(); goodsNumberLog.setOrderId(mqEntity.getOrderId()); goodsNumberLog.setGoodsId(goodsId); goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum()); goodsNumberLog.setLogTime(new Date()); goodsNumberLogMapper.insert(goodsNumberLog); //6. 将消息的处理状态改为成功 mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode()); mqConsumerLog.setConsumerTimestamp(new Date()); mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog); log.info("回退库存成功"); } catch (Exception e) { e.printStackTrace(); TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey(); primaryKey.setMsgTag(tags); primaryKey.setMsgKey(keys); primaryKey.setGroupName(groupName); TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey); if(mqConsumerLog==null){ //数据库未有记录 mqConsumerLog = new TradeMqConsumerLog(); mqConsumerLog.setMsgTag(tags); mqConsumerLog.setMsgKey(keys); mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode()); mqConsumerLog.setMsgBody(body); mqConsumerLog.setMsgId(msgId); mqConsumerLog.setConsumerTimes(1); mqConsumerLogMapper.insert(mqConsumerLog); }else{ mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1); mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog); } } } } 复制代码
然后其他退回的东西 这边就不写了,差不多流程
结尾
这种就是我们项目中真实的案例了。