从零搭建基于SpringBoot的秒杀系统(六):使用RabbitMQ让订单指定时间后失效

简介: 消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。

(一)RabbitMQ概述

消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。


如果你之前一点都没有接触过RabbitMQ,可以看我的消息中间件系列博客入门:

https://link.juejin.cn/?target=https%3A%2F%2Fblog.csdn.net%2Fqq_41973594%2Fcategory_10218582.html


在本篇博客中,我们要实现购物软件一项很常见的功能,订单失效。以淘宝为例,如果你30分钟内不付款,该订单就会被取消。这里我们将会使用RabbitMQ的异步调用特点实现订单失效。

(二)死信队列

死信(Dead Letter)队列本身就是一种普通的队列,只不过在创建队列过程中通过设置一些参数,将一个普通队列设置为死信队列。与其他消息队列不同的是,死信队列中入栈的消息会根据指定的过期时间等限制参数被移除队列送到另一个队列中。官方文档中对死信队列已经有了详细的介绍:


https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

死信队列用于订单失效是一种比较靠谱的方案,设置订单死信时间为30分钟,等到30分钟后会进入真实处理任务的队列中,失效的这个行为是在真实队列中实现的。

下面这张图介绍了死信队列的模型:


基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(三)RabbitMQ配置编写

上面的流程介绍已经详细介绍了死信队列的处理方式,接下来通过代码将上面的逻辑编写出来。首先在配置文件中新增rabbitmq相关配置,将队列、交换机以及路由的名称定义在配置文件中。使用rabbitmq的前提是你在PC上已经安装成功rabbitmq



https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

#rabbitmq相关配置
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
mq.env=javayz
#订单超时未支付自动失效-死信队列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#TTL过期时间单位为ms,设置10s方便测试
mq.kill.item.success.kill.expire=10000


在config下新建RabbitmqConfig类


package com.sdxb.secondkill.config;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
    @Autowired
    private Environment env;
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    //单一消费者
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置连接工厂
        factory.setConnectionFactory(connectionFactory);
        //设置数据交换格式
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //消费者监听个数
        factory.setConcurrentConsumers(1);
        //消费端的监听最大个数
        factory.setMaxConcurrentConsumers(1);
        //设置每次发送给消费者的消息数
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }
    //多个消费者
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListennerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //确认消费模式
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.councurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
        return factory;
    }
    //创建rabbit模板
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate template=new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("消息发送成功:correlationData("+correlationData+"),ack("+b+"),cause({"+s+"})");
            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息丢失:exchage("+exchange+"),route("+routingKey+"),replyCode("+replyCode+replyText+"),message("+message+")");
            }
        });
        return template;
    }
    //构建秒杀成功之后-订单超时未支付的死信队列消息模型
    //创建死信队列,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key
    @Bean
    public Queue successKillDeadQueue(){
        Map<String, Object> argsMap= Maps.newHashMap();
        argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
        argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
    }
    //基本交换机
    @Bean
    public TopicExchange successKillDeadProdExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
    }
    //创建基本交换机+基本路由 -> 死信队列 的绑定
    @Bean
    public Binding successKillDeadProdBinding(){
        return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
    }
    //真正的队列
    @Bean
    public Queue successKillRealQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
    }
    //死信交换机
    @Bean
    public TopicExchange successKillDeadExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
    }
    //死信交换机+死信路由->真正队列 的绑定
    @Bean
    public Binding successKillDeadBinding(){
        return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    }
}


在上面这段代码中,前三个Bean属于通用配置,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key,分别表示死信交换机和死信路由,在官网中对这两个参数如何编写有教程:



剩下的操作和图中的流程一样,基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(四)编写生产者和消费者

首先建一个包含订单号和用户信息的DTO,在dto下新建一个KillSuccessUserDto



@Data
@ToString
public class KillSuccessUserDto extends ItemKillSuccess implements Serializable {
    private String code;
    private String userName;
    private String phone;
    private String email;
    private String itemName;
}


在itemKillSuccessMapper中增加一条查询语句,通过订单号查询抢购成功表


@Select("select a.*,b.user_name,b.phone,b.email,c.name as itemName\n" +
        "from item_kill_success as a\n" +
        "left join user b on b.id=a.user_id\n" +
        "left join item c on c.id=a.item_id\n" +
        "where a.code=#{orderNo} and b.is_active=1")
KillSuccessUserDto selectByCode(String orderNo);


在Service下新建RabbitSenderService,作为消息的发送者,首先查询订单是否已经存在,如果存在,就将它送入死信队列,设置死信队列的TTL。在这里为了方便验证,设置TTL为10S


@Service
public class RabbitSenderService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private Environment env;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单
     * @param orderCode
     */
    public void sendKillSuccessOrderExpireMsg(final String orderCode){
        try {
            if (StringUtils.isNotBlank(orderCode)){
                ////查询订单是否存在
                KillSuccessUserDto info=itemKillSuccessMapper.selectByCode(orderCode);
                if (info!=null){
                    //将该消息送入死信队列,并且为了方便测试设置TTL为10秒,这里的ttl在application.properties中配置
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
                    rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
                    rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            MessageProperties mp=message.getMessageProperties();
                            mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserDto.class);
                            //TODO:动态设置TTL(为了测试方便,暂且设置10s)
                            mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
                            return message;
                        }
                    });
                }
            }
        }catch (Exception e){
            log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e.fillInStackTrace());
        }
    }
}


死信队列的TTL失效后会将消息发送到真实队列中,再写一个消息的接收者来处理业务,在Service下新建RabbitReceiveService,作为消息的消费者,将成功订单的status设置为-1,表示失效。


@Service
public class RabbitReceiveService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 用户秒杀成功后超时未支付-监听者
     * @param info
     */
    @RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
    public void consumeExpireOrder(KillSuccessUserDto info){
        try {
            log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);
            if (info!=null){
                ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
                if (entity!=null && entity.getStatus().intValue()==0){
                    //将成功订单的status设置为-1,表示失效
                    itemKillSuccessMapper.expireOrder(info.getCode());
                }
            }
        }catch (Exception e){
            log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
        }
    }
}


最后在KillServiceImpl中添加抢购成功后的业务:


if (itemKillSuccessMapper.countByKillUserId(itemKill.getId(),userId) <= 0){
    int res=itemKillSuccessMapper.insertSelective(entity);
    if(res>0){
        //处理抢购成功后的流程
        //这里的业务可以自己加
        //将订单送入死信队列
        rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
    }
}


(五)额外的处理方式

除使用RabbitMQ让订单失效以外,我们还可以使用定时任务不断轮询数据库,如果发现超过了失效时间但是status还是等于0的情况,则另这个属性等于-1

在config下新建SchedulerConfig,通过线程池注册定时任务


@Configuration
public class SchedulerConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //通过线程池注册定时任务
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }
}


在Service下新建SchedulerService,定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效。


@Configuration
public class SchedulerService {
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    @Autowired
    private Environment env;
    //定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效
    @Scheduled(cron = "0/30 * * * * ?")
    public void schedulerExpireOrders() {
        List<ItemKillSuccess> list = itemKillSuccessMapper.selectExpireOrders();
        if(list!=null&&!list.isEmpty()){
            for (ItemKillSuccess item : list) {
                if (item!=null && item.getStatus().intValue()==0 && item.getDiffTime()>env.getProperty("scheduler.expire.orders.time",Integer.class)){
                    itemKillSuccessMapper.expireOrder(item.getCode());
                }
            }
        }
    }
}


在配置文件中增加下面这条:


#定时任务订单失效时间,单位秒
scheduler.expire.orders.time=30


最后要让定时任务生效还需要在启动类中增加一条注解


@EnableScheduling



(六) 效果展示

当刚下订单后,status为0,表示订单抢购但未支付


过了指定之间后,status变为-1,表示订单已经失效


到目前的代码的代码https://link.juejin.cn/?target=https%3A%2F%2Fgithub.com%2FOliverLiy%2FSecondKill%2Ftree%2Fversion5.0


我搭建了一个微信公众号《Java鱼仔》,分享大量java知识点与学习经历,如果你对本项目有任何疑问,欢迎在公众号中联系我,我会尽自己所能为大家解答。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
JavaScript Java 关系型数据库
基于springboot的项目管理系统
本文探讨项目管理系统在现代企业中的应用与实现,分析其研究背景、意义及现状,阐述基于SSM、Java、MySQL和Vue等技术构建系统的关键方法,展现其在提升管理效率、协同水平与风险管控方面的价值。
|
2月前
|
搜索推荐 JavaScript Java
基于springboot的儿童家长教育能力提升学习系统
本系统聚焦儿童家长教育能力提升,针对家庭教育中理念混乱、时间不足、个性化服务缺失等问题,构建科学、系统、个性化的在线学习平台。融合Spring Boot、Vue等先进技术,整合优质教育资源,提供高效便捷的学习路径,助力家长掌握科学育儿方法,促进儿童全面健康发展,推动家庭和谐与社会进步。
|
2月前
|
JavaScript Java 关系型数据库
基于springboot的古树名木保护管理系统
本研究针对古树保护面临的严峻挑战,构建基于Java、Vue、MySQL与Spring Boot技术的信息化管理系统,实现古树资源的动态监测、数据管理与科学保护,推动生态、文化与经济可持续发展。
|
2月前
|
监控 安全 JavaScript
2025基于springboot的校车预定全流程管理系统
针对传统校车管理效率低、信息不透明等问题,本研究设计并实现了一套校车预定全流程管理系统。系统采用Spring Boot、Java、Vue和MySQL等技术,实现校车信息管理、在线预定、实时监控等功能,提升学校管理效率,保障学生出行安全,推动教育信息化发展。
|
3月前
|
存储 JavaScript Java
基于springboot的大学公文收发管理系统
本文介绍公文收发系统的研究背景与意义,分析其在数字化阅读趋势下的必要性。系统采用Vue、Java、Spring Boot与MySQL技术,实现高效、便捷的公文管理与在线阅读,提升用户体验与信息处理效率。
|
2月前
|
人工智能 Java 关系型数据库
基于springboot的画品交流系统
本项目构建基于Java+Vue+SpringBoot+MySQL的画品交流系统,旨在解决传统艺术交易信息不透明、流通受限等问题,融合区块链与AI技术,实现画品展示、交易、鉴赏与社交一体化,推动艺术数字化转型与文化传播。
|
2月前
|
JavaScript Java 关系型数据库
基于springboot的高校运动会系统
本系统基于Spring Boot、Vue与MySQL,实现高校运动会报名、赛程安排及成绩管理的全流程信息化,提升组织效率,杜绝信息错漏与冒名顶替,推动体育赛事智能化发展。
|
2月前
|
JavaScript 安全 Java
基于springboot的大学生兼职系统
本课题针对大学生兼职信息不对称、权益难保障等问题,研究基于Spring Boot、Vue、MySQL等技术的兼职系统,旨在构建安全、高效、功能完善的平台,提升大学生就业竞争力与兼职质量。
|
2月前
|
JavaScript Java 关系型数据库
基于springboot的美食城服务管理系统
本系统基于Spring Boot、Java、Vue和MySQL技术,构建集消费者服务、商家管理与后台监管于一体的美食城综合管理平台,提升运营效率与用户体验。

热门文章

最新文章