开发者学堂课程【RocketMQ 知识精讲与项目实战(第二阶段):回退库存幂等性处理】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/703/detail/12429
回退库存幂等性处理
1、按照流程分析,现在用代码实现出来。打开当前的 listener 处理当前的流程。
@Slf4j
@Component
@RocketMQMessageListener
@Value("${mq . order . consumer. group .name}" )
消费者组名
private String groupName;
@Autowired
private TradeGoodsMapper goodsMapper ;
有接口后,注入进来。
@Autowired
Private TradeMqConsumerLogMapper mqConsumerLogMapper ;
@Autowired
private TradeGoodsNumberLogMapper goodsNumberLogMapper;
@Override
public void onMessage (MessageExt messageExt) {
String msgId =null;
String tags,=nu
ll
;
String keys,=nu
ll
;
String body,=null;
try {
//1.解析消息内容,获得消息内容,body是字节数组,需要转换成字符串
String msgId = messageExt. getMsgId();
String tags = messageExt. getTags();
String keys = messageExt. getKeys();
String body = new String(messageExt.getBody(), charsetName: "UTF-8" );
//接收到消息后,也进行打印,代表当前已经收到消息
Log. info("接受消息成功");
//2.查询消息消费记录
依赖于表的 mapper 去查,进入资料,找到
TradeMqConsumerLogMapper。
指定 mapper 的接口,再找到 TradeMqConsumerLogMapper.xml 配置文件,放到 mappe r的文件下面。
//2.查询消息消费记录,通过表可以看到是一个复合的主键,把三个值都传递过去,查询唯一的一条记录,需要PrimaryKey 对象,封装当前的主键,有三个属性。
TradeMqConsumerLogKey primaryKey = new
TradeMqConsumerLogKey();
primaryKey. setMsgTag(tags);
primaryKey. setMsgKey(keys);
primaryKey. setGroupName ( groupName) ;
TradeMqConsumerLog mqConsumerLog=mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
消费过要获得它当前的状态
if(mqConsumerLog!=nu
ll
){
//3.判断如果消费过...
//3.1获得消息处理状态
Integer status = mqConsumerLog. getConsumerStatus();
//处理过...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode() . intValue( )==status . intValue()){
如果是处理过的
Log. info("
消息
:"+msgId+",
已经处理过
");
return;
}
//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode(). intValue( )==status . intValue()){
Log. info("
消息
:"+msgId+",
正在处理
");
return;
}
//处理失败
if( ShopCode .SHOP_ MQ_ MESSAGE_ STATUS_ FAIL.
getCode(). intValue()==status . intValue()){
判断当前消息的处理次数
//获得消息处理次数
Integer times = mqConsumerLog. getConsumerTimes();
if(times>3){
Log. info("
消息
: "+msgId+" ,
消息处理超过
3
次
,
不能再进行处理了
");
return;
}
//如果没有超过要使用乐观锁更改消息处理的状态为正在处理
mqConsumerLog.
setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ PROCESSING . getCode());
//第一个参数设置完成,第二个参数数据库的乐观锁就是指定当前更改的条件,封装 example
TradeMqConsumerLogExample example = new
TradeMqConsumerLogExample();
//为了防止并发修改的复制性,设置当前更改的条件
TradeMqConsumerLogExample .Criteria criteria =
example . createCriteria();
criteria. andMsgTagEqualTo(mqConsumerLog
getMsgTag());
criteria. andMsgKeyEqualTo(mqConsumerLog
getMsgKey());
criteria andGroupNameEqualTo( groupName);
//注意现在更改的时候次数应该和查询出来的次数一样,如果别人已经修改当前的修改就会出现异常
criteria.andConsumerTimesEqualTo(mqConsumerLog
getConsumerTimes();
Int
r
=mqConsumerLogMapper.
updateByExampleSelective(mqConsumerLog, example)
If
(
r
<=0){
//
未修改成功
,
其他线程并发修改
,
提示信息
Log. info("并发修改,稍后处理");
}
}
}else{
//开始走右边的分支,当前没有查询出来消息处理的信息
//4.判断如果没有消费过...
mqConsumerLog = new TradeMqConsumerLog();
//设置当前消息的内容
mqConsumerLog. setMsgTag(tags);
mqConsumerLog . setMsgKey(keys);
//状态改为正在处理mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ PROCESSING. getCode());
mqConsumerLog. setMsgBody(body);
mqConsumerLog . setMsgId(msgId) ;
//消费的次数,指的是失败的次数
mqConsumerLog . setConsumerTimes(0);
//将消息处理信息添加到数据库
mqConsumerLogMapper. insert (mqConsumerLog) ;
}
2、参考笔记
//回退库存
CancelorderMQ cancelordedMQ= JSON. parseobject (body,
CancelorderMQ.class);
//封装goods对象
TradeGoodsNumberLoggoods=new TradeGoodsNumberLog();
goods . setGoods Id(cance
l
orderMQ. getGoodsId());
goods.setGoodsNumber(cancelorderMQ. getGoodsNumber());
goods . setorderId(cancelorderMQ. getorderId());
goodsService. addGoodsNumber (goods);
3、//5.回退库存
//body设置的是mqentity.class,将mqentity.class进行转换,获得字符串
MQEntity mqEntity = JSON. parseObject(body,
MQEntity.class);
//得到GoodsId
Long goodsId=mqEntity . getGoodsId();
//goodsid传递过去,查询出当前的商品对象
TradeGoodsgoods=goodsMapper.
selectByPrimaryKey(goodsId)
//更改库存
goods.setGoodsNumber(goods. getGoodsNumber( )
+mqEntity. getGoodsNum());
//把goods更新回去
goodsMapper . updateByPrimaryKey(goods);
//记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new
TradeGoodsNumberLog( );
goodsNumberLog. setOrderId( mqEntity. getOrderId());
goodsNumberLog. setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
//操作日期
goodsNumberLog. setLogTime(new Date());
goodsNumberLogMapper . insert(goodsNumberLog);
4、整个流程走到更改消息处理状态为成功。
//6.将消息的处理状态改为成功
//不管是处理过的还是为处理过的,都已经把 mqConsumerLog 插入到数据库中,从数据库中把当前的消息查询出来,或者可以直接复用 mqConsumerLog,只不过需要把 ConsumerStatus 改为成功
mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ SUCCESS . getCode());
查看有哪些信息是没有进行设置的
//时间戳
mqConsumerLog. setConsumerTimestamp(new Date());
//更新
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog) ;
//整个消息的处理在商品服务回退库存的操作全部实现完成,日志打印
Log. info("回退库存成功");
} catch (Exception e) {
e. printStackTrace();
//异常后,进行消息处理失败的记录,允许重复进行消费
TradeMqConsumerLog mqConsumerLog = new
TradeMqConsumerLog();
//设置为成员变量,
String msgId =null;
String tags,=nu
ll
;
String keys,=nu
ll
;
String body,=null;
try {
//1.解析消息内容
//分别赋值
msgId = messageExt. getMsgId();
tags = messageExt. getTags();
keys = messageExt. getKeys();
body=new String(messageExt.getBody(), charsetName: "UTF-8" );
mqConsumerLog . setMsgTag(tags);
mqConsumerLog . setMsgKey(keys);
mqConsumerLog . setConsumerStatus (ShopCode . SHOP_ _MQ_ MESSAGE_ STATUS_ PROCESSING. getCode
mqConsumerLog . setMsgBody(body);
mqConsumerLog . setMsgId( msgId);
mqConsumerLog . setConsumerTimes();
//如果在catch里面处理失败抓到异常,要对于当前的错误次数加一,可以换一种写法,先根据主键把当前的消费情况查询出来,进行记录
TradeMqConsumerLogKey primaryKey = new
TradeMqConsumerLogKey();
primaryKey. setMsgTag(tags);
primaryKey. setMsgKey(keys);
primaryKey. setGroupName groupName);
TradeMqConsumerLog mqConsumerLog=
mqConsumerLogMapper.
selectByPrimaryKey(primaryKey);
if(mqConsumerLog==null){
//
如果当前没有查询出来
,
数据库未有记录
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog. setMsgTag(tags);
mqConsumerLog. setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_
FAIL
. getCode())
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes
(1)
;
//
做插入操作
mqConsumerLogMapper . insert (mqConsumerLog) ;
//
如果不为空
,
只需要做更新即可
}else{
mqConsumerLog.setConsumerTimes(mqConsumerLog.
getConsumerTimes()+1);)
mqConsumerLogMapper.updateByPrimaryKeySelective
( mqConsumerLog) ;
根据当前的错误情况做了更新,如果当前第一次就处理失败,处理次数就是一,如果不是第一次,就加一再更新即可。
4、总结
接收到信息之后,做了消息内容的解析
msgId = messageExt. getMsgId();
tags= messageExt. getTags();
keys= messageExt. getKeys();
body= new String(messageExt . getBody(), charsetName: "UTF-8");
查询当前消息消费的记录,根据主键查,主键是符合主键,三个都是主键
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey. setMsgTag(tags);
primaryKey. setMsgKey(keys);
primaryKey. setGroupName ( groupName) ;
TradeMqConsumerLog mqConsumerLog=mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
如果查询完不为空,再去判断它的状态,如果状态是成功就直接返回,如果是正在处理也直接返回,如果处理失败就获得当前的次数,如果次数大于3就不用再处理,如果不大于3,状态改为正在处理,更新到数据库
if(mqConsumerLog!=nu
ll
){
//3.判断如果消费过...
//3.1获得消息处理状态
Integer status = mqConsumerLog. getConsumerStatus();
//处理过...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode() . intValue( )==status . intValue()){
Log. info("
消息
:"+msgId+",
已经处理过
");
return;
}
//正在处理...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode(). intValue( )==status . intValue()){
Log. info("
消息
:"+msgId+",
正在处理
");
return;
}
//处理失败
if( ShopCode .SHOP_ MQ_ MESSAGE_ STATUS_ FAIL.
getCode(). intValue()==status . intValue()){
//
获得消息处理次数
Integer times = mqConsumerLog. getConsumerTimes();
if(times>3){
Log. info("
消息
: "+msgId+" ,
消息处理超过
3
次
,
不能再进行处理了
");
return;
}
mqConsumerLog.
setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_ PROCESSING . getCode());
//
使用数据库乐观锁更新
,
保证在并发的情况下数据库的互视性
TradeMqConsumerLogExample example = new
TradeMqConsumerLogExample();
TradeMqConsumerLogExample .Criteria criteria=
example . createCriteria();
criteria. andMsgTagEqualTo(mqConsumerLog getMsgTag());
criteria. andMsgKeyEqualTo(mqConsumerLog getMsgKey());
criteria andGroupNameEqualTo( groupName);
criteria.andConsumerTimesEqualTo(mqConsumerLog
getConsumerTimes();
Int
r
=mqConsumerLogMapper.
updateByExampleSelective(mqConsumerLog, example)
If
(
r
<=0){
//
未修改成功
,
其他线程并发修改
,
提示信息
Log. info("并发修改,稍后处理");
}
}
如果在查询时没有查询到当前处理的消息,就将当前的消息的记录插入到数据库,当前的状态是正在处理。
}else{
//4.判断如果没有消费过...
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog. setMsgTag(tags);
mqConsumerLog . setMsgKey(keys);
mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ PROCESSING. getCode());
mqConsumerLog. setMsgBody(body);
mqConsumerLog . setMsgId(msgId) ;
mqConsumerLog . setConsumerTimes(0);
//
将信息处理信息添加到数据库
mqConsumerLogMapper. insert (mqConsumerLog) ;
}
//5
.回退库存
,
先将消息转换成
mqEntity,
取出
goodsId
查询出商品对象
,
将商品数量加上当前消息中商品的数量更新回去
MQEntity mqEntity = JSON. parseObject(body,
MQEntity.class);
Long goodsId=mqEntity . getGoodsId();
TradeGoods goods=goodsMapper.
selectByPrimaryKey(goodsId);
goods.setGoodsNumber(goods. getGoodsNumber( )
+mqEntity. getGoodsNum());
goodsMapper . updateByPrimaryKey(goods);
//
记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new
TradeGoodsNumberLog( );
goodsNumberLog. setOrderId( mqEntity. getOrderId());
goodsNumberLog. setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
goodsNumberLog. setLogTime(new Date());
goodsNumberLogMapper . insert(goodsNumberLog);
整个处理完之后,说明库存已经更新完成。
//6
.将消息的处理状态改为成功
mqConsumerLog. setConsumerStatus (ShopCode. SHOP_ MQ_ MESSAGE_ STATUS_ SUCCESS . getCode());
mqConsumerLog. setConsumerTimestamp(new Date());
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog) ;
Log. info("回退库存成功");
} catch (Exception e) {
e. printStackTrace();
如果出现异常,进行记录消息处理失败的次数,先去查询数据库中已经有的记录,如果没有查到代表第一次处理失败,添加到数据库,次数设置为一,如果能够查到,把它当前的拿出来次数加一,再更新到数据库。
TradeMqConsumerLogKey primaryKey = new
TradeMqConsumerLogKey();
primaryKey. setMsgTag(tags);
primaryKey. setMsgKey(keys);
primaryKey. setGroupName groupName);
TradeMqConsumerLog mqConsumerLog=
mqConsumerLogMapper.
selectByPrimaryKey(primaryKey);
if(mqConsumerLog==null){
//
数据库未有记录
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog. setMsgTag(tags);
mqConsumerLog. setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_ MESSAGE_ STATUS_
FAIL
. getCode())
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes
(1)
;
mqConsumerLogMapper . insert (mqConsumerLog) ;
}else{
mqConsumerLog.setConsumerTimes(mqConsumerLog.
getConsumerTimes()+1);)
mqConsumerLogMapper.updateByPrimaryKeySelective ( mqConsumerLog) ;
}
}