从零搭建基于SpringBoot的秒杀系统(六):使用RabbitMQ让订单指定时间后失效

简介: 消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。

(一)RabbitMQ概述

消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。


如果你之前一点都没有接触过RabbitMQ,可以看我的消息中间件系列博客入门:

https://link.juejin.cn/?target=https%3A%2F%2Fblog.csdn.net%2Fqq_41973594%2Fcategory_10218582.html


在本篇博客中,我们要实现购物软件一项很常见的功能,订单失效。以淘宝为例,如果你30分钟内不付款,该订单就会被取消。这里我们将会使用RabbitMQ的异步调用特点实现订单失效。

(二)死信队列

死信(Dead Letter)队列本身就是一种普通的队列,只不过在创建队列过程中通过设置一些参数,将一个普通队列设置为死信队列。与其他消息队列不同的是,死信队列中入栈的消息会根据指定的过期时间等限制参数被移除队列送到另一个队列中。官方文档中对死信队列已经有了详细的介绍:


https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

死信队列用于订单失效是一种比较靠谱的方案,设置订单死信时间为30分钟,等到30分钟后会进入真实处理任务的队列中,失效的这个行为是在真实队列中实现的。

下面这张图介绍了死信队列的模型:


基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(三)RabbitMQ配置编写

上面的流程介绍已经详细介绍了死信队列的处理方式,接下来通过代码将上面的逻辑编写出来。首先在配置文件中新增rabbitmq相关配置,将队列、交换机以及路由的名称定义在配置文件中。使用rabbitmq的前提是你在PC上已经安装成功rabbitmq



https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

#rabbitmq相关配置
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
mq.env=javayz
#订单超时未支付自动失效-死信队列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#TTL过期时间单位为ms,设置10s方便测试
mq.kill.item.success.kill.expire=10000


在config下新建RabbitmqConfig类


package com.sdxb.secondkill.config;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
    @Autowired
    private Environment env;
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    //单一消费者
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置连接工厂
        factory.setConnectionFactory(connectionFactory);
        //设置数据交换格式
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //消费者监听个数
        factory.setConcurrentConsumers(1);
        //消费端的监听最大个数
        factory.setMaxConcurrentConsumers(1);
        //设置每次发送给消费者的消息数
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }
    //多个消费者
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListennerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //确认消费模式
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.councurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
        return factory;
    }
    //创建rabbit模板
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate template=new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("消息发送成功:correlationData("+correlationData+"),ack("+b+"),cause({"+s+"})");
            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息丢失:exchage("+exchange+"),route("+routingKey+"),replyCode("+replyCode+replyText+"),message("+message+")");
            }
        });
        return template;
    }
    //构建秒杀成功之后-订单超时未支付的死信队列消息模型
    //创建死信队列,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key
    @Bean
    public Queue successKillDeadQueue(){
        Map<String, Object> argsMap= Maps.newHashMap();
        argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
        argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
    }
    //基本交换机
    @Bean
    public TopicExchange successKillDeadProdExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
    }
    //创建基本交换机+基本路由 -> 死信队列 的绑定
    @Bean
    public Binding successKillDeadProdBinding(){
        return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
    }
    //真正的队列
    @Bean
    public Queue successKillRealQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
    }
    //死信交换机
    @Bean
    public TopicExchange successKillDeadExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
    }
    //死信交换机+死信路由->真正队列 的绑定
    @Bean
    public Binding successKillDeadBinding(){
        return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    }
}


在上面这段代码中,前三个Bean属于通用配置,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key,分别表示死信交换机和死信路由,在官网中对这两个参数如何编写有教程:



剩下的操作和图中的流程一样,基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(四)编写生产者和消费者

首先建一个包含订单号和用户信息的DTO,在dto下新建一个KillSuccessUserDto



@Data
@ToString
public class KillSuccessUserDto extends ItemKillSuccess implements Serializable {
    private String code;
    private String userName;
    private String phone;
    private String email;
    private String itemName;
}


在itemKillSuccessMapper中增加一条查询语句,通过订单号查询抢购成功表


@Select("select a.*,b.user_name,b.phone,b.email,c.name as itemName\n" +
        "from item_kill_success as a\n" +
        "left join user b on b.id=a.user_id\n" +
        "left join item c on c.id=a.item_id\n" +
        "where a.code=#{orderNo} and b.is_active=1")
KillSuccessUserDto selectByCode(String orderNo);


在Service下新建RabbitSenderService,作为消息的发送者,首先查询订单是否已经存在,如果存在,就将它送入死信队列,设置死信队列的TTL。在这里为了方便验证,设置TTL为10S


@Service
public class RabbitSenderService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private Environment env;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单
     * @param orderCode
     */
    public void sendKillSuccessOrderExpireMsg(final String orderCode){
        try {
            if (StringUtils.isNotBlank(orderCode)){
                ////查询订单是否存在
                KillSuccessUserDto info=itemKillSuccessMapper.selectByCode(orderCode);
                if (info!=null){
                    //将该消息送入死信队列,并且为了方便测试设置TTL为10秒,这里的ttl在application.properties中配置
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
                    rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
                    rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            MessageProperties mp=message.getMessageProperties();
                            mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserDto.class);
                            //TODO:动态设置TTL(为了测试方便,暂且设置10s)
                            mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
                            return message;
                        }
                    });
                }
            }
        }catch (Exception e){
            log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e.fillInStackTrace());
        }
    }
}


死信队列的TTL失效后会将消息发送到真实队列中,再写一个消息的接收者来处理业务,在Service下新建RabbitReceiveService,作为消息的消费者,将成功订单的status设置为-1,表示失效。


@Service
public class RabbitReceiveService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 用户秒杀成功后超时未支付-监听者
     * @param info
     */
    @RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
    public void consumeExpireOrder(KillSuccessUserDto info){
        try {
            log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);
            if (info!=null){
                ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
                if (entity!=null && entity.getStatus().intValue()==0){
                    //将成功订单的status设置为-1,表示失效
                    itemKillSuccessMapper.expireOrder(info.getCode());
                }
            }
        }catch (Exception e){
            log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
        }
    }
}


最后在KillServiceImpl中添加抢购成功后的业务:


if (itemKillSuccessMapper.countByKillUserId(itemKill.getId(),userId) <= 0){
    int res=itemKillSuccessMapper.insertSelective(entity);
    if(res>0){
        //处理抢购成功后的流程
        //这里的业务可以自己加
        //将订单送入死信队列
        rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
    }
}


(五)额外的处理方式

除使用RabbitMQ让订单失效以外,我们还可以使用定时任务不断轮询数据库,如果发现超过了失效时间但是status还是等于0的情况,则另这个属性等于-1

在config下新建SchedulerConfig,通过线程池注册定时任务


@Configuration
public class SchedulerConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //通过线程池注册定时任务
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }
}


在Service下新建SchedulerService,定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效。


@Configuration
public class SchedulerService {
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    @Autowired
    private Environment env;
    //定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效
    @Scheduled(cron = "0/30 * * * * ?")
    public void schedulerExpireOrders() {
        List<ItemKillSuccess> list = itemKillSuccessMapper.selectExpireOrders();
        if(list!=null&&!list.isEmpty()){
            for (ItemKillSuccess item : list) {
                if (item!=null && item.getStatus().intValue()==0 && item.getDiffTime()>env.getProperty("scheduler.expire.orders.time",Integer.class)){
                    itemKillSuccessMapper.expireOrder(item.getCode());
                }
            }
        }
    }
}


在配置文件中增加下面这条:


#定时任务订单失效时间,单位秒
scheduler.expire.orders.time=30


最后要让定时任务生效还需要在启动类中增加一条注解


@EnableScheduling



(六) 效果展示

当刚下订单后,status为0,表示订单抢购但未支付


过了指定之间后,status变为-1,表示订单已经失效


到目前的代码的代码https://link.juejin.cn/?target=https%3A%2F%2Fgithub.com%2FOliverLiy%2FSecondKill%2Ftree%2Fversion5.0


我搭建了一个微信公众号《Java鱼仔》,分享大量java知识点与学习经历,如果你对本项目有任何疑问,欢迎在公众号中联系我,我会尽自己所能为大家解答。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
522 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
2月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
926 1
|
7月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
7月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
198 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
170 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
6月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
228 32
|
5月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
835 0