分布式 Redis & RabbitMQ 终极秒杀

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 上期我们使用阻塞队列和分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是RabbitMQ.

♨️本篇文章记录的为RabbitMQ知识中企业级项目中秒杀相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

上期我们使用阻塞队列分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 而且还存在很多问题, 比如说这时候突然停电了,我们队列里的消息没来得及被消费就消失了.这就要出大问题.

现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是RabbitMQ.

@[TOC]

🐇思路分析

  1. 我们先把MYSQL数据库中的订单数量在redis中保存一份.
  2. 在秒杀的业务中我们还是先执行Lua脚本, 判断我们是否具有购买资格, 如果没有购买资格我们直接返回结果;
  3. 如果有有购买资格,Lua脚本会直接进行redis中的订单扣减操作,进行秒杀
  4. 同时在redis的扣减操作完成后, 把下单信息保存到阻塞队列进行异步执行, 实现MYSQL数据库中订单扣减的同步操作,完美.

在这里插入图片描述

🐇代码操作

思路听起来很简单,下面我们来进行具体操作.

先引入RabbitMQ的核心依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在yml文件中进行相关配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
      retry:
        #发布重试,默认false
        enabled: true
        #重试时间 默认1000ms
        initial-interval: 1000
        #重试最大次数 最大3
        max-attempts: 3
        #重试最大间隔时间
        max-interval: 10000
        #重试的时间隔乘数,比如配20 第一次等于10s,第二次等于20s,第三次等于40s
        multiplier: 1
    listener:
      # 默认配置是simple
      type: simple
      simple:
        # 手动ack Acknowledge mode of container. auto none
        acknowledge-mode: manual
        #消费者调用程序线程的最小数量
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者每次只处理一条信息,处理完在继续下一条
        prefetch: 1
        #启动时是否默认启动容器
        auto-startup: true
        #被拒绝时重新进入队列
        default-requeue-rejected: true

配置RabbitMQConfig文件

@Slf4j
@Configuration
public class RabbitMQConfig {
   
   

    @Bean("RabbitTemplate")
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
   
   
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
   
   
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   
   
                log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("ConfirmCallback:     "+"确认情况:"+ack);
                log.info("ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
   
   

            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
   
   
                log.info("return 执行了....");
                log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage().toString());
                log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());
            }
        });

        return rabbitTemplate;
    }

    /**
     * Json转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
   
   
        return new Jackson2JsonMessageConverter();
    }

    //交换机名称
    public static final String SECKILL_EXCHANGE ="seckill_exchange";

    //队列名称
    public static final String ORDER_QUEUE ="order_queue";

    @Bean("ORDER_QUEUE")
    Queue confirmQueue(){
   
   
        return QueueBuilder.durable(ORDER_QUEUE).build();
    }

    /**
     * 创建一个交换机
     * @return
     */
    @Bean("SECKILL_EXCHANGE")
    Exchange confirmExchange(){
   
   
        return ExchangeBuilder.topicExchange(SECKILL_EXCHANGE).durable(true).build();
    }

    @Bean
    Binding confirmExchange(@Qualifier("SECKILL_EXCHANGE") Exchange exchange,
                            @Qualifier("ORDER_QUEUE") Queue queue){
   
   
        //bind(queue) 绑定队列 to(exchange) 绑定交换机 with("") routingKey这里绑定的是空白可以按照自己的要求绑定  noargs() 没有参数
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

补充 : RabbitMQ的Json转换器尤为重要, 因为rabbitMQ中发送和接收的都是字符串/字节数组类型的消息, 所以我们要把java对象转成json串进行发送.

Lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

下面是秒杀业务代码

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
   
   

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    @Resource(description = "RabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
   
   
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private IVoucherOrderService proxy;


    public void handleVoucherOrder(VoucherOrder voucherOrder) {
   
   
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
   
   
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
   
   
            //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
            createVoucherOrder(voucherOrder);
        } finally {
   
   
            // 释放锁
            redisLock.unlock();
        }
    }


    @Override
    public Result seckillVoucher(Long voucherId) {
   
   
        //获取用户
        Long userId = UserHolder.getUser().getId();

        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
   
   
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入mq
        rabbitTemplate.convertAndSend("seckill_exchange", "order.voucher", voucherOrder);
        // 3.获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 4.返回订单id
        return Result.ok(orderId);
    }


    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
   
   
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = Math.toIntExact(query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count());
        // 5.2.判断是否存在
        if (count > 0) {
   
   
            // 用户已经购买过了
            log.error("不允许重复下单!");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
   
   
            // 扣减失败
            log.error("库存不足!");
            return;
        }

        // 7.创建订单
        save(voucherOrder);

    }

}

总得有个消费者来监听我们的队列

消费者代码

@Slf4j
@Component
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
   
   

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;

    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
   
   

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
   
   
            //1.接收转换消息
            ObjectMapper mapper = new ObjectMapper();
            VoucherOrder voucherOrder = mapper.readValue(message.getBody(), VoucherOrder.class);
            //2. 处理业务逻辑
            try {
   
   
                // 2.创建订单

                voucherOrderService.handleVoucherOrder(voucherOrder);
                System.out.println("订单已执行");
            } catch (Exception e) {
   
   
                log.error("处理订单异常", e);
            }
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
   
   

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);

        }
    }
}

代码部分就这么些, 思路并不算复杂

在这里插入图片描述

🐇展示结果

在这里插入图片描述

这里显示抢购成功了

我们再看看redis和mysql中情况如何

在这里插入图片描述

在这里插入图片描述
显而易见, 表中对应的订单数量也都扣减了, 完美.

🐇补充

RabbitMQ中JSON格式转换为实体对象

//将JSON格式数据转换为实体对象
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), VoucherOrder.class);

RabbitMQ中JSON格式转换为Map对象

 @Test
    public void sender() throws AmqpException
    {
   
   
        //创建用户信息Map
        Map<String, Object> userMap = new HashMap<>();
        userMap.put("userId", "1");
        userMap.put("userName", "阿千弟的博客");
        userMap.put("blogUrl", "https://blog.csdn.net/qq_51033936");
        userMap.put("userRemark", "您好,欢迎访问 阿千弟的博客");

        /**
         * 发送消息,参数说明:
         * String exchange:交换器名称。
         * String routingKey:路由键。
         * Object object:发送内容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
//将JSON格式数据转换为Map对象
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
   
   

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;

    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
   
   

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

       //将JSON格式数据转换为Map对象
            ObjectMapper mapper = new ObjectMapper();
            JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
            Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);

            System.out.println("接收者收到Map格式消息:");
            System.out.println("用户编号:" + resultMap.get("userId"));
            System.out.println("用户名称:" + resultMap.get("userName"));
            System.out.println("博客地址:" + resultMap.get("blogUrl"));
            System.out.println("博客信息:" + resultMap.get("userRemark"));

            //确认消息
            channel.basicAck(deliveryTag, true)
        } catch (Exception e) {
   
   

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。
在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
14天前
|
NoSQL Java 关系型数据库
【Redis系列笔记】分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
112 2
|
9天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
110 16
探秘Redis分布式锁:实战与注意事项
|
11天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
11天前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
12天前
|
NoSQL Redis 微服务
分布式锁_redis实现
分布式锁_redis实现
|
16天前
|
NoSQL Java Redis
Redis入门到通关之分布式锁Rediision
Redis入门到通关之分布式锁Rediision
15 0
|
16天前
|
NoSQL 关系型数据库 MySQL
Redis入门到通关之Redis实现分布式锁
Redis入门到通关之Redis实现分布式锁
18 1
|
机器学习/深度学习 缓存 NoSQL
|
缓存 NoSQL Java
为什么分布式一定要有redis?
1、为什么使用redis 分析:博主觉得在项目中使用redis,主要是从两个角度去考虑:性能和并发。当然,redis还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件(如zookpeer等)代替,并不是非要使用redis。
1334 0
|
1天前
|
存储 监控 NoSQL
Redis哨兵&分片集群
Redis哨兵&分片集群
6 0