开发者学堂课程【RocketMQ 知识精讲与项目实战(第二阶段):回退库存流程分析】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/703/detail/12428
回退库存流程分析
1、实现接口修改为 RocketMQListener 和之前所做的 demo 一样,需要指定当前泛型,泛型就是对应的消息发送方,消息发送的类型 message,也可以指定 message 类型作为泛型,
@Override
public void onMessage(Message message)
重写的方法里面就会去接收 message 类型的消息,但是需要修改 message 为 MessageExt,pogo 类,MessageExt是 message 的子类,用它接收 message 类型的消息也可以。用它的好处是有 msgid 消息的 id,将来会用消息的 id进行存储。
@Override
public void onMessage(Message
ext
message)
更改的是优惠卷,shop-goods-service,shop-order-service,shop-user- service都要进行修改,指定类型。
public class CanceIMQListener implements RocketMQListener<MessageExt>{
@Override
public void onMessage(MessageExt messageExt) {
2、回退库存
首先接收到消息后,解析消息内容,查询消息消费记录,为保证消息处理的密动性,将来会将处理的消息存到数据库中,接收到消息后,不是进行消费而是判断消息有没有进行处理,如果已经做过处理就不用再进行处理,牵扯回退库存,牵扯回退优惠卷以及回退余额,如果服务端发送了重复的消息,如果进行重复消费会对业务造成影响,所以消息的密动性是需要在消费端进行保证的,查询消息消费记录就是查询 trade_mq_consumer_log 表。
表里面有 msg_id,group_name,msg_tag,msg_key,msg_body 消息的主题内容以及目前消息处理的状态,如果根据 group_name,msg_tag,msg_key 查询到已经处理过的消息,那么当接收到相同消息的时候,就可以直接舍弃掉,这样就可以保证密动性,consumer_times 指的是消费的次数,最多不能超过三次,可以消费失败,但是需要进行新的处理,最终失败的次数不能超过三次,超过三次也会进行舍弃,consumer_timestamp 消费时间,remark 备注的信息。
所以进行正式的消息处理之前先进行查询,如果查询到记录,要进行判断,判断当前的状态,如果是已经处理成功的就直接结束,如果是正在处理的也直接结束,不用再做任何动作,如果是处理失败,就是之前处理过消息,但是发生了异常,判断当前失败的次数,不足三次使用数据库的乐观锁将消息处理状态改为正在处理,使用消息库的乐观锁是因为如果在高并发的情况下多个线程同时进行消息的处理,要在数据库更新这里保证互视性。
左边的分支处理过的消息。
如果查询消息消费记录没有查询到,说明当前消息没有处理过,要在数据库中添加一条已经正在处理的消息,和左边的流程保持一致,处理回退库存,就是将消息中的商品数量加上。
注意将当前的消息处理状态改为成功。