从零搭建基于SpringBoot的秒杀系统(六):使用RabbitMQ让订单指定时间后失效

简介: 消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。

(一)RabbitMQ概述

消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。java中常用的消息中间件有ActiveMQ、RabbitMQ、Kafka等等。消息中间件的作用主要有系统解耦、异步调用、流量削峰等等。


如果你之前一点都没有接触过RabbitMQ,可以看我的消息中间件系列博客入门:

https://link.juejin.cn/?target=https%3A%2F%2Fblog.csdn.net%2Fqq_41973594%2Fcategory_10218582.html


在本篇博客中,我们要实现购物软件一项很常见的功能,订单失效。以淘宝为例,如果你30分钟内不付款,该订单就会被取消。这里我们将会使用RabbitMQ的异步调用特点实现订单失效。

(二)死信队列

死信(Dead Letter)队列本身就是一种普通的队列,只不过在创建队列过程中通过设置一些参数,将一个普通队列设置为死信队列。与其他消息队列不同的是,死信队列中入栈的消息会根据指定的过期时间等限制参数被移除队列送到另一个队列中。官方文档中对死信队列已经有了详细的介绍:


https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

死信队列用于订单失效是一种比较靠谱的方案,设置订单死信时间为30分钟,等到30分钟后会进入真实处理任务的队列中,失效的这个行为是在真实队列中实现的。

下面这张图介绍了死信队列的模型:


基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(三)RabbitMQ配置编写

上面的流程介绍已经详细介绍了死信队列的处理方式,接下来通过代码将上面的逻辑编写出来。首先在配置文件中新增rabbitmq相关配置,将队列、交换机以及路由的名称定义在配置文件中。使用rabbitmq的前提是你在PC上已经安装成功rabbitmq



https://link.juejin.cn/?target=https%3A%2F%2Fwww.rabbitmq.com%2Fdlx.html

#rabbitmq相关配置
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
mq.env=javayz
#订单超时未支付自动失效-死信队列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#TTL过期时间单位为ms,设置10s方便测试
mq.kill.item.success.kill.expire=10000


在config下新建RabbitmqConfig类


package com.sdxb.secondkill.config;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
    @Autowired
    private Environment env;
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    //单一消费者
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        //设置连接工厂
        factory.setConnectionFactory(connectionFactory);
        //设置数据交换格式
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //消费者监听个数
        factory.setConcurrentConsumers(1);
        //消费端的监听最大个数
        factory.setMaxConcurrentConsumers(1);
        //设置每次发送给消费者的消息数
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }
    //多个消费者
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListennerContainer(){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //确认消费模式
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.councurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
        return factory;
    }
    //创建rabbit模板
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate template=new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("消息发送成功:correlationData("+correlationData+"),ack("+b+"),cause({"+s+"})");
            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("消息丢失:exchage("+exchange+"),route("+routingKey+"),replyCode("+replyCode+replyText+"),message("+message+")");
            }
        });
        return template;
    }
    //构建秒杀成功之后-订单超时未支付的死信队列消息模型
    //创建死信队列,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key
    @Bean
    public Queue successKillDeadQueue(){
        Map<String, Object> argsMap= Maps.newHashMap();
        argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
        argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
    }
    //基本交换机
    @Bean
    public TopicExchange successKillDeadProdExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
    }
    //创建基本交换机+基本路由 -> 死信队列 的绑定
    @Bean
    public Binding successKillDeadProdBinding(){
        return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
    }
    //真正的队列
    @Bean
    public Queue successKillRealQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
    }
    //死信交换机
    @Bean
    public TopicExchange successKillDeadExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
    }
    //死信交换机+死信路由->真正队列 的绑定
    @Bean
    public Binding successKillDeadBinding(){
        return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    }
}


在上面这段代码中,前三个Bean属于通用配置,死信队列中需要有两个参数x-dead-letter-exchange和x-dead-letter-routing-key,分别表示死信交换机和死信路由,在官网中对这两个参数如何编写有教程:



剩下的操作和图中的流程一样,基本交换机和基本路由绑定死信队列,生产者通过基本交换机和基本路由把消息发送到死信队列中,死信队列由死信交换机、死信路由和过期时间(TTL)组成,并绑定到真实队列里,过期时间到了之后自动发送到所绑定的真实队列中,消费者消费消息,即将订单失效。

(四)编写生产者和消费者

首先建一个包含订单号和用户信息的DTO,在dto下新建一个KillSuccessUserDto



@Data
@ToString
public class KillSuccessUserDto extends ItemKillSuccess implements Serializable {
    private String code;
    private String userName;
    private String phone;
    private String email;
    private String itemName;
}


在itemKillSuccessMapper中增加一条查询语句,通过订单号查询抢购成功表


@Select("select a.*,b.user_name,b.phone,b.email,c.name as itemName\n" +
        "from item_kill_success as a\n" +
        "left join user b on b.id=a.user_id\n" +
        "left join item c on c.id=a.item_id\n" +
        "where a.code=#{orderNo} and b.is_active=1")
KillSuccessUserDto selectByCode(String orderNo);


在Service下新建RabbitSenderService,作为消息的发送者,首先查询订单是否已经存在,如果存在,就将它送入死信队列,设置死信队列的TTL。在这里为了方便验证,设置TTL为10S


@Service
public class RabbitSenderService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private Environment env;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单
     * @param orderCode
     */
    public void sendKillSuccessOrderExpireMsg(final String orderCode){
        try {
            if (StringUtils.isNotBlank(orderCode)){
                ////查询订单是否存在
                KillSuccessUserDto info=itemKillSuccessMapper.selectByCode(orderCode);
                if (info!=null){
                    //将该消息送入死信队列,并且为了方便测试设置TTL为10秒,这里的ttl在application.properties中配置
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
                    rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
                    rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            MessageProperties mp=message.getMessageProperties();
                            mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserDto.class);
                            //TODO:动态设置TTL(为了测试方便,暂且设置10s)
                            mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
                            return message;
                        }
                    });
                }
            }
        }catch (Exception e){
            log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e.fillInStackTrace());
        }
    }
}


死信队列的TTL失效后会将消息发送到真实队列中,再写一个消息的接收者来处理业务,在Service下新建RabbitReceiveService,作为消息的消费者,将成功订单的status设置为-1,表示失效。


@Service
public class RabbitReceiveService {
    public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    /**
     * 用户秒杀成功后超时未支付-监听者
     * @param info
     */
    @RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
    public void consumeExpireOrder(KillSuccessUserDto info){
        try {
            log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);
            if (info!=null){
                ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
                if (entity!=null && entity.getStatus().intValue()==0){
                    //将成功订单的status设置为-1,表示失效
                    itemKillSuccessMapper.expireOrder(info.getCode());
                }
            }
        }catch (Exception e){
            log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
        }
    }
}


最后在KillServiceImpl中添加抢购成功后的业务:


if (itemKillSuccessMapper.countByKillUserId(itemKill.getId(),userId) <= 0){
    int res=itemKillSuccessMapper.insertSelective(entity);
    if(res>0){
        //处理抢购成功后的流程
        //这里的业务可以自己加
        //将订单送入死信队列
        rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
    }
}


(五)额外的处理方式

除使用RabbitMQ让订单失效以外,我们还可以使用定时任务不断轮询数据库,如果发现超过了失效时间但是status还是等于0的情况,则另这个属性等于-1

在config下新建SchedulerConfig,通过线程池注册定时任务


@Configuration
public class SchedulerConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //通过线程池注册定时任务
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }
}


在Service下新建SchedulerService,定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效。


@Configuration
public class SchedulerService {
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    @Autowired
    private Environment env;
    //定时获取status=0的订单并判断是否超过了支付订单时间,然后进行失效
    @Scheduled(cron = "0/30 * * * * ?")
    public void schedulerExpireOrders() {
        List<ItemKillSuccess> list = itemKillSuccessMapper.selectExpireOrders();
        if(list!=null&&!list.isEmpty()){
            for (ItemKillSuccess item : list) {
                if (item!=null && item.getStatus().intValue()==0 && item.getDiffTime()>env.getProperty("scheduler.expire.orders.time",Integer.class)){
                    itemKillSuccessMapper.expireOrder(item.getCode());
                }
            }
        }
    }
}


在配置文件中增加下面这条:


#定时任务订单失效时间,单位秒
scheduler.expire.orders.time=30


最后要让定时任务生效还需要在启动类中增加一条注解


@EnableScheduling



(六) 效果展示

当刚下订单后,status为0,表示订单抢购但未支付


过了指定之间后,status变为-1,表示订单已经失效


到目前的代码的代码https://link.juejin.cn/?target=https%3A%2F%2Fgithub.com%2FOliverLiy%2FSecondKill%2Ftree%2Fversion5.0


我搭建了一个微信公众号《Java鱼仔》,分享大量java知识点与学习经历,如果你对本项目有任何疑问,欢迎在公众号中联系我,我会尽自己所能为大家解答。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的服装商城管理系统
基于Java+Springboot+Vue开发的服装商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的服装商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
153 2
基于Java+Springboot+Vue开发的服装商城管理系统
|
11天前
|
XML Java 数据库连接
SpringBoot集成Flowable:打造强大的工作流管理系统
在企业级应用开发中,工作流管理是一个核心组件,它能够帮助我们定义、执行和管理业务流程。Flowable是一个开源的工作流和业务流程管理(BPM)平台,它提供了强大的工作流引擎和建模工具。结合SpringBoot,我们可以快速构建一个高效、灵活的工作流管理系统。本文将探讨如何将Flowable集成到SpringBoot应用中,并展示其强大的功能。
45 1
|
20天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
30天前
|
存储 安全 Java
打造智能合同管理系统:SpringBoot与电子签章的完美融合
【10月更文挑战第7天】 在数字化转型的浪潮中,电子合同管理系统因其高效、环保和安全的特点,正逐渐成为企业合同管理的新宠。本文将分享如何利用SpringBoot框架实现一个集电子文件签字与合同管理于一体的智能系统,探索技术如何助力合同管理的现代化。
61 4
|
30天前
|
前端开发 Java Apache
SpringBoot实现电子文件签字+合同系统!
【10月更文挑战第15天】 在现代企业运营中,合同管理和电子文件签字成为了日常活动中不可或缺的一部分。随着技术的发展,电子合同系统因其高效性、安全性和环保性,逐渐取代了传统的纸质合同。本文将详细介绍如何使用SpringBoot框架实现一个电子文件签字和合同管理系统。
53 1
|
1月前
|
文字识别 安全 Java
SpringBoot3.x和OCR构建车牌识别系统
本文介绍了一个基于Java SpringBoot3.x框架的车牌识别系统,详细阐述了系统的设计目标、需求分析及其实现过程。利用Tesseract OCR库和OpenCV库,实现了车牌图片的识别与处理,确保系统的高准确性和稳定性。文中还提供了具体的代码示例,展示了如何构建和优化车牌识别服务,以及如何处理特殊和异常车牌。通过实际应用案例,帮助读者理解和应用这一解决方案。
|
2月前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的大学竞赛报名管理系统
基于Java+Springboot+Vue开发的大学竞赛报名管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的大学竞赛报名管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
219 3
基于Java+Springboot+Vue开发的大学竞赛报名管理系统
|
2月前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的蛋糕商城管理系统
基于Java+Springboot+Vue开发的蛋糕商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的蛋糕商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
156 3
基于Java+Springboot+Vue开发的蛋糕商城管理系统
|
2月前
|
前端开发 JavaScript Java
基于Java+Springboot+Vue开发的美容预约管理系统
基于Java+Springboot+Vue开发的美容预约管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的美容预约管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。
54 3
基于Java+Springboot+Vue开发的美容预约管理系统
|
2月前
|
JavaScript Java 关系型数据库
毕设项目&课程设计&毕设项目:基于springboot+vue实现的在线考试系统(含教程&源码&数据库数据)
本文介绍了一个基于Spring Boot和Vue.js实现的在线考试系统。随着在线教育的发展,在线考试系统的重要性日益凸显。该系统不仅能提高教学效率,减轻教师负担,还为学生提供了灵活便捷的考试方式。技术栈包括Spring Boot、Vue.js、Element-UI等,支持多种角色登录,具备考试管理、题库管理、成绩查询等功能。系统采用前后端分离架构,具备高性能和扩展性,未来可进一步优化并引入AI技术提升智能化水平。
毕设项目&课程设计&毕设项目:基于springboot+vue实现的在线考试系统(含教程&源码&数据库数据)