回退库存幂等性处理|学习笔记

简介: 快速学习回退库存幂等性处理

开发者学堂课程【RocketMQ 知识精讲与项目实战(第二阶段)回退库存幂等性处理】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/703/detail/12429


回退库存幂等性处理


1、按照流程分析现在用代码实现出来打开当前的 listener 处理当前的流程

@Slf4j

@Component

@RocketMQMessageListener

@Value("${mq . order . consumer. group .name}" )消费者组名

private String groupName;

@Autowired

private TradeGoodsMapper goodsMapper ;

有接口后注入进来

@Autowired

Private TradeMqConsumerLogMapper mqConsumerLogMapper ;

@Autowired

private TradeGoodsNumberLogMapper goodsNumberLogMapper;

@Override

public void onMessage (MessageExt messageExt) {

String msgId =null;

String tags,=null;

String keys,=null;

String body,=null;

try {

//1.解析消息内容,获得消息内容body是字节数组需要转换成字符串

String msgId = messageExt. getMsgId();

String tags = messageExt. getTags();

String keys = messageExt. getKeys();

String body = new String(messageExt.getBody(), charsetName: "UTF-8" );

//接收到消息后也进行打印代表当前已经收到消息

Log. info("接受消息成功");

//2.查询消息消费记录

依赖于表的 mapper 去查进入资料找到

TradeMqConsumerLogMapper

image.png

指定 mapper 的接口再找到 TradeMqConsumerLogMapper.xml 配置文件放到 mappe r的文件下面

image.png

//2.查询消息消费记录,通过表可以看到是一个复合的主键,把三个值都传递过去查询唯一的一条记录需要PrimaryKey 对象封装当前的主键,有三个属性

image.png

TradeMqConsumerLogKey primaryKey = new

TradeMqConsumerLogKey();

primaryKey. setMsgTag(tags);

primaryKey. setMsgKey(keys);

primaryKey. setGroupName ( groupName) ;

TradeMqConsumerLog mqConsumerLog=mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

消费过要获得它当前的状态

image.png

if(mqConsumerLog!=null){

//3.判断如果消费过...

//3.1获得消息处理状态

Integer status = mqConsumerLog. getConsumerStatus();

//处理过...返回

if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode() . intValue( )==status . intValue()){

如果是处理过的

Log. info("消息:"+msgId+",已经处理过");

return;

}

//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode(). intValue( )==status . intValue()){

Log. info("消息:"+msgId+",正在处理");

return;

}

//处理失败

if( ShopCode .SHOP_ MQ_ MESSAGE_ STATUS_ FAIL.

getCode(). intValue()==status . intValue()){

判断当前消息的处理次数

image.png

//获得消息处理次数

Integer times = mqConsumerLog. getConsumerTimes();

if(times>3){

Log. info(" 消息 : "+msgId+" ,消息处理超过3不能再进行处理了");

return;

}

//如果没有超过要使用乐观锁更改消息处理的状态为正在处理

mqConsumerLog.

setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ PROCESSING . getCode());

//第一个参数设置完成第二个参数数据库的乐观锁就是指定当前更改的条件封装 example

TradeMqConsumerLogExample example = new

TradeMqConsumerLogExample();

//为了防止并发修改的复制性设置当前更改的条件

TradeMqConsumerLogExample .Criteria criteria =

example . createCriteria();

criteria. andMsgTagEqualTo(mqConsumerLog

getMsgTag());

criteria. andMsgKeyEqualTo(mqConsumerLog

getMsgKey());

criteria andGroupNameEqualTo( groupName);

//注意现在更改的时候次数应该和查询出来的次数一样如果别人已经修改当前的修改就会出现异常

criteria.andConsumerTimesEqualTo(mqConsumerLog

getConsumerTimes();

Intr=mqConsumerLogMapper.

updateByExampleSelective(mqConsumerLog, example)

If(r<=0){

//未修改成功其他线程并发修改提示信息

Log. info("并发修改,稍后处理");

}

}

}else{

//开始走右边的分支当前没有查询出来消息处理的信息

//4.判断如果没有消费过...

mqConsumerLog = new TradeMqConsumerLog();

//设置当前消息的内容

mqConsumerLog. setMsgTag(tags);

mqConsumerLog . setMsgKey(keys);

//状态改为正在处理mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ PROCESSING. getCode());

mqConsumerLog. setMsgBody(body);

mqConsumerLog . setMsgId(msgId) ;

//消费的次数指的是失败的次数

mqConsumerLog . setConsumerTimes(0);

//将消息处理信息添加到数据库

mqConsumerLogMapper. insert (mqConsumerLog) ;

}

2、参考笔记

//回退库存

CancelorderMQ cancelordedMQ= JSON. parseobject (body,

CancelorderMQ.class);

//封装goods对象

TradeGoodsNumberLoggoods=new TradeGoodsNumberLog();

goods . setGoods Id(cancelorderMQ. getGoodsId());

goods.setGoodsNumber(cancelorderMQ. getGoodsNumber());

goods . setorderId(cancelorderMQ. getorderId());

goodsService. addGoodsNumber (goods);

3、//5.回退库存

//body设置的是mqentity.class将mqentity.class进行转换获得字符串

MQEntity mqEntity = JSON. parseObject(body,

MQEntity.class);

//得到GoodsId

Long goodsId=mqEntity . getGoodsId();

//goodsid传递过去查询出当前的商品对象

TradeGoodsgoods=goodsMapper.

selectByPrimaryKey(goodsId)

//更改库存

goods.setGoodsNumber(goods. getGoodsNumber( )

+mqEntity. getGoodsNum());

//把goods更新回去

goodsMapper . updateByPrimaryKey(goods);

//记录库存操作日志

TradeGoodsNumberLog goodsNumberLog = new

TradeGoodsNumberLog( );

goodsNumberLog. setOrderId( mqEntity. getOrderId());

goodsNumberLog. setGoodsId(goodsId);

goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());

//操作日期

goodsNumberLog. setLogTime(new Date());

goodsNumberLogMapper . insert(goodsNumberLog);

4、整个流程走到更改消息处理状态为成功

image.png

//6.将消息的处理状态改为成功

//不管是处理过的还是为处理过的都已经把 mqConsumerLog 插入到数据库中从数据库中把当前的消息查询出来或者可以直接复用 mqConsumerLog只不过需要把 ConsumerStatus 改为成功

mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ SUCCESS . getCode());

查看有哪些信息是没有进行设置的

image.png

//时间戳

mqConsumerLog. setConsumerTimestamp(new Date());

//更新

mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog) ;

//整个消息的处理在商品服务回退库存的操作全部实现完成日志打印

Log. info("回退库存成功");

} catch (Exception e) {

e. printStackTrace();

//异常后进行消息处理失败的记录允许重复进行消费

TradeMqConsumerLog mqConsumerLog = new

TradeMqConsumerLog();

//设置为成员变量

String msgId =null;

String tags,=null;

String keys,=null;

String body,=null;

try {

//1.解析消息内容

//分别赋值

msgId = messageExt. getMsgId();

tags = messageExt. getTags();

keys = messageExt. getKeys();

body=new String(messageExt.getBody(), charsetName: "UTF-8" );

mqConsumerLog . setMsgTag(tags);

mqConsumerLog . setMsgKey(keys);mqConsumerLog . setConsumerStatus (ShopCode . SHOP_ _MQ_ MESSAGE_ STATUS_ PROCESSING. getCode

mqConsumerLog . setMsgBody(body);

mqConsumerLog . setMsgId( msgId);

mqConsumerLog . setConsumerTimes();

//如果在catch里面处理失败抓到异常要对于当前的错误次数加一可以换一种写法先根据主键把当前的消费情况查询出来进行记录

TradeMqConsumerLogKey primaryKey = new

TradeMqConsumerLogKey();

primaryKey. setMsgTag(tags);

primaryKey. setMsgKey(keys);

primaryKey. setGroupName groupName);

TradeMqConsumerLog mqConsumerLog=

mqConsumerLogMapper.

selectByPrimaryKey(primaryKey);

if(mqConsumerLog==null){

//如果当前没有查询出来数据库未有记录

mqConsumerLog = new TradeMqConsumerLog();

mqConsumerLog. setMsgTag(tags);

mqConsumerLog. setMsgKey(keys);

mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ FAIL. getCode())

mqConsumerLog.setMsgBody(body);

mqConsumerLog.setMsgId(msgId);

mqConsumerLog.setConsumerTimes(1);

//做插入操作

mqConsumerLogMapper . insert (mqConsumerLog) ;

//如果不为空只需要做更新即可

}else{

mqConsumerLog.setConsumerTimes(mqConsumerLog.

getConsumerTimes()+1);)

mqConsumerLogMapper.updateByPrimaryKeySelective

( mqConsumerLog) ;

根据当前的错误情况做了更新如果当前第一次就处理失败处理次数就是一如果不是第一次就加一再更新即可

4、总结

接收到信息之后做了消息内容的解析

msgId = messageExt. getMsgId();

tags= messageExt. getTags();

keys= messageExt. getKeys();

body= new String(messageExt . getBody(), charsetName: "UTF-8");

查询当前消息消费的记录根据主键查主键是符合主键三个都是主键

TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();

primaryKey. setMsgTag(tags);

primaryKey. setMsgKey(keys);

primaryKey. setGroupName ( groupName) ;

TradeMqConsumerLog mqConsumerLog=mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

如果查询完不为空再去判断它的状态如果状态是成功就直接返回如果是正在处理也直接返回如果处理失败就获得当前的次数如果次数大于3就不用再处理如果不大于3,状态改为正在处理更新到数据库

if(mqConsumerLog!=null){

//3.判断如果消费过...

//3.1获得消息处理状态

Integer status = mqConsumerLog. getConsumerStatus();

//处理过...返回

if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode() . intValue( )==status . intValue()){

Log. info("消息:"+msgId+",已经处理过");

return;

}

//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode(). intValue( )==status . intValue()){

Log. info("消息:"+msgId+",正在处理");

return;

}

//处理失败if( ShopCode .SHOP_ MQ_ MESSAGE_ STATUS_ FAIL.

getCode(). intValue()==status . intValue()){

//获得消息处理次数

Integer times = mqConsumerLog. getConsumerTimes();

if(times>3){

Log. info(" 消息 : "+msgId+" ,消息处理超过3不能再进行处理了");

return;

}mqConsumerLog.

setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ PROCESSING . getCode());

//使用数据库乐观锁更新保证在并发的情况下数据库的互视性

TradeMqConsumerLogExample example = new

TradeMqConsumerLogExample();

TradeMqConsumerLogExample .Criteria criteria=

example . createCriteria();

criteria. andMsgTagEqualTo(mqConsumerLog getMsgTag());

criteria. andMsgKeyEqualTo(mqConsumerLog getMsgKey());

criteria andGroupNameEqualTo( groupName);

criteria.andConsumerTimesEqualTo(mqConsumerLog

getConsumerTimes();

Intr=mqConsumerLogMapper.

updateByExampleSelective(mqConsumerLog, example)

If(r<=0){

//未修改成功其他线程并发修改提示信息

Log. info("并发修改,稍后处理");

}

}

如果在查询时没有查询到当前处理的消息就将当前的消息的记录插入到数据库当前的状态是正在处理

}else{

//4.判断如果没有消费过...

mqConsumerLog = new TradeMqConsumerLog();

mqConsumerLog. setMsgTag(tags);

mqConsumerLog . setMsgKey(keys);mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ PROCESSING. getCode());

mqConsumerLog. setMsgBody(body);

mqConsumerLog . setMsgId(msgId) ;

mqConsumerLog . setConsumerTimes(0);

//将信息处理信息添加到数据库

mqConsumerLogMapper. insert (mqConsumerLog) ;

}

//5.回退库存先将消息转换成mqEntity,取出goodsId查询出商品对象将商品数量加上当前消息中商品的数量更新回去

MQEntity mqEntity = JSON. parseObject(body,

MQEntity.class);

Long goodsId=mqEntity . getGoodsId();

TradeGoods goods=goodsMapper.

selectByPrimaryKey(goodsId);

goods.setGoodsNumber(goods. getGoodsNumber( )

+mqEntity. getGoodsNum());

goodsMapper . updateByPrimaryKey(goods);

//记录库存操作日志

TradeGoodsNumberLog goodsNumberLog = new

TradeGoodsNumberLog( );

goodsNumberLog. setOrderId( mqEntity. getOrderId());

goodsNumberLog. setGoodsId(goodsId);

goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());

goodsNumberLog. setLogTime(new Date());

goodsNumberLogMapper . insert(goodsNumberLog);

整个处理完之后说明库存已经更新完成

//6.将消息的处理状态改为成功mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ SUCCESS . getCode());

mqConsumerLog. setConsumerTimestamp(new Date());

mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog) ;

Log. info("回退库存成功");

} catch (Exception e) {

e. printStackTrace();

如果出现异常进行记录消息处理失败的次数先去查询数据库中已经有的记录如果没有查到代表第一次处理失败添加到数据库次数设置为一如果能够查到把它当前的拿出来次数加一再更新到数据库

TradeMqConsumerLogKey primaryKey = new

TradeMqConsumerLogKey();

primaryKey. setMsgTag(tags);

primaryKey. setMsgKey(keys);

primaryKey. setGroupName groupName);

TradeMqConsumerLog mqConsumerLog=

mqConsumerLogMapper.

selectByPrimaryKey(primaryKey);

if(mqConsumerLog==null){

//数据库未有记录

mqConsumerLog = new TradeMqConsumerLog();

mqConsumerLog. setMsgTag(tags);

mqConsumerLog. setMsgKey(keys);

mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ FAIL. getCode())

mqConsumerLog.setMsgBody(body);

mqConsumerLog.setMsgId(msgId);

mqConsumerLog.setConsumerTimes(1);

mqConsumerLogMapper . insert (mqConsumerLog) ;

}else{

mqConsumerLog.setConsumerTimes(mqConsumerLog.

getConsumerTimes()+1);)

mqConsumerLogMapper.updateByPrimaryKeySelective ( mqConsumerLog) ;

}

}

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
存储 缓存 NoSQL
防止订单重复提交或支付分布式锁方案设计
防止订单重复提交或支付分布式锁方案设计
361 0
|
SQL 缓存 NoSQL
接口的幂等性设计和防重保证,详细分析幂等性的几种实现方法
本篇文章详细说明了幂等性,解释了什么是幂等性,幂等性的使用场景,讨论了幂等和防重的概念。分析了幂等性的情况以及如何设计幂等性服务。阐述了幂等性实现防重的几种策略,包括乐关锁,防重表,分布式锁,token令牌以及支付缓冲区。
5155 0
接口的幂等性设计和防重保证,详细分析幂等性的几种实现方法
|
7月前
|
消息中间件 缓存 NoSQL
如何实现消费幂等 ?
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:**消费幂等**。
如何实现消费幂等 ?
|
8月前
|
消息中间件 存储 Kafka
MQ保证消息幂等机制
MQ保证消息幂等机制
144 0
|
10月前
|
存储 NoSQL Redis
下单接口防重提交问题
下单接口防重提交问题
|
11月前
|
前端开发 NoSQL 数据库
幂等性与防重的区别
自己的一点小理解
388 0
|
数据库
分布式系统如何防止重复下单?
分布式系统如何防止重复下单?
249 0
|
消息中间件 存储 资源调度
订单超时怎么处理?我们用这种方案
在电商业务下,许多订单超时场景都在24小时以上,对于超时精度没有那么敏感,并且有海量订单需要批处理,推荐使用基于定时任务的跑批解决方案。
1122 0
订单超时怎么处理?我们用这种方案
|
SQL 负载均衡 NoSQL
【防止重复下单】分布式系统接口幂等性实现方案
【防止重复下单】分布式系统接口幂等性实现方案
1292 0
【防止重复下单】分布式系统接口幂等性实现方案
|
消息中间件 存储 数据库
回退库存流程分析|学习笔记
快速学习回退库存流程分析
137 0
回退库存流程分析|学习笔记