分布式 Redis & RabbitMQ 终极秒杀

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 上期我们使用阻塞队列和分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是RabbitMQ.

♨️本篇文章记录的为RabbitMQ知识中企业级项目中秒杀相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

上期我们使用阻塞队列分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 而且还存在很多问题, 比如说这时候突然停电了,我们队列里的消息没来得及被消费就消失了.这就要出大问题.

现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是RabbitMQ.

@[TOC]

🐇思路分析

  1. 我们先把MYSQL数据库中的订单数量在redis中保存一份.
  2. 在秒杀的业务中我们还是先执行Lua脚本, 判断我们是否具有购买资格, 如果没有购买资格我们直接返回结果;
  3. 如果有有购买资格,Lua脚本会直接进行redis中的订单扣减操作,进行秒杀
  4. 同时在redis的扣减操作完成后, 把下单信息保存到阻塞队列进行异步执行, 实现MYSQL数据库中订单扣减的同步操作,完美.

在这里插入图片描述

🐇代码操作

思路听起来很简单,下面我们来进行具体操作.

先引入RabbitMQ的核心依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在yml文件中进行相关配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
      retry:
        #发布重试,默认false
        enabled: true
        #重试时间 默认1000ms
        initial-interval: 1000
        #重试最大次数 最大3
        max-attempts: 3
        #重试最大间隔时间
        max-interval: 10000
        #重试的时间隔乘数,比如配20 第一次等于10s,第二次等于20s,第三次等于40s
        multiplier: 1
    listener:
      # 默认配置是simple
      type: simple
      simple:
        # 手动ack Acknowledge mode of container. auto none
        acknowledge-mode: manual
        #消费者调用程序线程的最小数量
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者每次只处理一条信息,处理完在继续下一条
        prefetch: 1
        #启动时是否默认启动容器
        auto-startup: true
        #被拒绝时重新进入队列
        default-requeue-rejected: true

配置RabbitMQConfig文件

@Slf4j
@Configuration
public class RabbitMQConfig {
   
   

    @Bean("RabbitTemplate")
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
   
   
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
   
   
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   
   
                log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("ConfirmCallback:     "+"确认情况:"+ack);
                log.info("ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
   
   

            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
   
   
                log.info("return 执行了....");
                log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage().toString());
                log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());
            }
        });

        return rabbitTemplate;
    }

    /**
     * Json转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
   
   
        return new Jackson2JsonMessageConverter();
    }

    //交换机名称
    public static final String SECKILL_EXCHANGE ="seckill_exchange";

    //队列名称
    public static final String ORDER_QUEUE ="order_queue";

    @Bean("ORDER_QUEUE")
    Queue confirmQueue(){
   
   
        return QueueBuilder.durable(ORDER_QUEUE).build();
    }

    /**
     * 创建一个交换机
     * @return
     */
    @Bean("SECKILL_EXCHANGE")
    Exchange confirmExchange(){
   
   
        return ExchangeBuilder.topicExchange(SECKILL_EXCHANGE).durable(true).build();
    }

    @Bean
    Binding confirmExchange(@Qualifier("SECKILL_EXCHANGE") Exchange exchange,
                            @Qualifier("ORDER_QUEUE") Queue queue){
   
   
        //bind(queue) 绑定队列 to(exchange) 绑定交换机 with("") routingKey这里绑定的是空白可以按照自己的要求绑定  noargs() 没有参数
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

补充 : RabbitMQ的Json转换器尤为重要, 因为rabbitMQ中发送和接收的都是字符串/字节数组类型的消息, 所以我们要把java对象转成json串进行发送.

Lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

下面是秒杀业务代码

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
   
   

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    @Resource(description = "RabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
   
   
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private IVoucherOrderService proxy;


    public void handleVoucherOrder(VoucherOrder voucherOrder) {
   
   
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
   
   
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
   
   
            //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
            createVoucherOrder(voucherOrder);
        } finally {
   
   
            // 释放锁
            redisLock.unlock();
        }
    }


    @Override
    public Result seckillVoucher(Long voucherId) {
   
   
        //获取用户
        Long userId = UserHolder.getUser().getId();

        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
   
   
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入mq
        rabbitTemplate.convertAndSend("seckill_exchange", "order.voucher", voucherOrder);
        // 3.获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 4.返回订单id
        return Result.ok(orderId);
    }


    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
   
   
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = Math.toIntExact(query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count());
        // 5.2.判断是否存在
        if (count > 0) {
   
   
            // 用户已经购买过了
            log.error("不允许重复下单!");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
   
   
            // 扣减失败
            log.error("库存不足!");
            return;
        }

        // 7.创建订单
        save(voucherOrder);

    }

}

总得有个消费者来监听我们的队列

消费者代码

@Slf4j
@Component
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
   
   

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;

    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
   
   

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
   
   
            //1.接收转换消息
            ObjectMapper mapper = new ObjectMapper();
            VoucherOrder voucherOrder = mapper.readValue(message.getBody(), VoucherOrder.class);
            //2. 处理业务逻辑
            try {
   
   
                // 2.创建订单

                voucherOrderService.handleVoucherOrder(voucherOrder);
                System.out.println("订单已执行");
            } catch (Exception e) {
   
   
                log.error("处理订单异常", e);
            }
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
   
   

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);

        }
    }
}

代码部分就这么些, 思路并不算复杂

在这里插入图片描述

🐇展示结果

在这里插入图片描述

这里显示抢购成功了

我们再看看redis和mysql中情况如何

在这里插入图片描述

在这里插入图片描述
显而易见, 表中对应的订单数量也都扣减了, 完美.

🐇补充

RabbitMQ中JSON格式转换为实体对象

//将JSON格式数据转换为实体对象
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), VoucherOrder.class);

RabbitMQ中JSON格式转换为Map对象

 @Test
    public void sender() throws AmqpException
    {
   
   
        //创建用户信息Map
        Map<String, Object> userMap = new HashMap<>();
        userMap.put("userId", "1");
        userMap.put("userName", "阿千弟的博客");
        userMap.put("blogUrl", "https://blog.csdn.net/qq_51033936");
        userMap.put("userRemark", "您好,欢迎访问 阿千弟的博客");

        /**
         * 发送消息,参数说明:
         * String exchange:交换器名称。
         * String routingKey:路由键。
         * Object object:发送内容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
//将JSON格式数据转换为Map对象
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
   
   

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;

    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
   
   

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

       //将JSON格式数据转换为Map对象
            ObjectMapper mapper = new ObjectMapper();
            JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
            Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);

            System.out.println("接收者收到Map格式消息:");
            System.out.println("用户编号:" + resultMap.get("userId"));
            System.out.println("用户名称:" + resultMap.get("userName"));
            System.out.println("博客地址:" + resultMap.get("blogUrl"));
            System.out.println("博客信息:" + resultMap.get("userRemark"));

            //确认消息
            channel.basicAck(deliveryTag, true)
        } catch (Exception e) {
   
   

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。
在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
8天前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
Redis,分布式缓存演化之路
|
2月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
360 7
|
2月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
210 5
|
3月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
101 8
|
3月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
83 16
|
4月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
3月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
68 5
|
4月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
310 11
|
4月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
129 11