Redis之高并发超卖问题解决方案

简介: 在高并发的秒杀抢购场景中,常常会面临一个称为“超卖”(Over-Selling)的问题。超卖指的是同一件商品被售出的数量超过了实际库存数量,导致库存出现负数。这是由于多个用户同时发起抢购请求,而系统未能有效地控制库存的并发访问。

1. Redis高并发超卖问题解决方案


在高并发的秒杀抢购场景中,常常会面临一个称为“超卖”(Over-Selling)的问题。超卖指的是同一件商品被售出的数量超过了实际库存数量,导致库存出现负数。这是由于多个用户同时发起抢购请求,而系统未能有效地控制库存的并发访问。


下面进行一个秒杀购买某个商品的接口模拟,代码如下:

@RestController
public class MyController {
    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String key="product_" + id;
        int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
        if(count>0){
            stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
            System.out.println(key+"商品购买成功,剩余库存"+count);
            return "success";
        }
        System.out.println(key+"商品库存不足");
        return "error";
    }
}


上面的代码在高并发环境下容易出现超卖问题,使用JMeter进行压测,如下图:


进行压测获得的日志如下图,存在并发安全问题。



要解决上面的问题,我们一开始想到的是synchronized加锁,但是在 Redis 的高并发环境下,使用 Java 中的 synchronized关键字来解决超卖问题是行不通的,原因如下:


分布式环境下无效: synchronized是 Java 中的关键字,用于在单个 JVM 中保护共享资源。在分布式环境下,多个服务实例之间无法通过synchronized来同步,因为各个实例之间无法直接共享 JVM 中的锁。


性能问题: synchronized会导致性能问题,尤其在高并发的情况下,争夺锁可能会成为瓶颈。


对于 Redis 高并发环境下的超卖问题,更合适的解决方案通常是使用 Redis 提供的分布式锁(如基于 Redis 的分布式锁实现)。这可以确保在分布式环境中的原子性和可靠性。


基于Redis的分布式锁,我们可以基于Redis中的Setnx(命令在指定的 key 不存在时,为 key 设置指定的值),更改代码如下:


@RestController
public class MyController {
    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}


然后使用JMeter压测,在10s内陆续发送500个请求,日志如下图,由图可以看出基本解决超卖问题。



1.1 高并发场景超卖bug解析


系统在达到finally块之前崩溃宕机,锁可能会一直存在于Redis中。这可能会导致其他进程或线程无法在未来获取该锁,从而导致资源被锁定,后续尝试访问该资源的操作可能被阻塞。因此在redis中给定 key设置过期时间。代码如下:


@RestController
public class MyController {
    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock",10, TimeUnit.SECONDS); //保证原子性
//        Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
//        stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}


在高并发场景下,还存在一个问题,即业务执行时间过长可能导致 Redis 锁提前释放,并且误删除其他线程或进程持有的锁。这可能发生在以下情况:


线程A获取锁并开始执行业务逻辑。

由于高并发,其他线程B、C等也尝试获取相同资源的锁。

由于锁的过期时间设置为10秒,线程A的业务逻辑执行时间超过10秒,导致其锁被 Redis 自动释放。

线程B在10秒内获取到了之前由线程A持有的锁,并开始执行业务逻辑。

线程A在业务逻辑执行完成后,尝试删除自己的锁,但由于已经被线程B持有,线程A实际上删除的是线程B的锁。

修改代码如下:

@RestController
public class MyController {
    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String lock="product_lock_"+id;
        String key="product_" + id;
        String clientId=UUID.randomUUID().toString();
        Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, clientId,10, TimeUnit.SECONDS); //保证原子性
//        Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
//        stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
        String message="error";
        if(!lock1){
            System.out.println("业务繁忙稍后再试");
            return "业务繁忙稍后再试";
        }
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            if (stringRedisTemplate.opsForValue().get(lock).equals(clientId))//在这里如果有别的业务代码并且耗时较长, stringRedisTemplate.delete(lock)之前还是有可能超过过期时间出现问题
                stringRedisTemplate.delete(lock);
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}



上面的代码在高并发场景下仍然存在概率很低的问题,所以就有了redisson分布式锁。


1.2 Redisson


Redisson 是一个用于 Java 的 Redis 客户端,它提供了丰富的功能,包括分布式锁。Redisson 的分布式锁实现了基于 Redis 的分布式锁,具有简单易用、可靠性高的特点。


以下是 Redisson 分布式锁的一些重要特性和用法:


可重入锁: Redisson 的分布式锁是可重入的,同一线程可以多次获取同一把锁,而不会出现死锁。


公平锁: Redisson 支持公平锁,即按照获取锁的顺序依次获取,避免了某些线程一直获取不到锁的情况。


锁超时: 可以为分布式锁设置过期时间,确保即使在某些情况下锁没有被显式释放,也能在一定时间后自动释放。


异步锁: Redisson 提供了异步的分布式锁,通过异步 API 可以在不阻塞线程的情况下获取和释放锁。


监控锁状态: Redisson 允许监控锁的状态,包括锁是否被某个线程持有,锁的过期时间等。



导入依赖


 

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.23.5</version>
        </dependency>


application.yaml 配置:


spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: 1000ms


RedissonConfig配置:


@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    /**
     * RedissonClient,单机模式
     */
    @Bean
    public RedissonClient redisson() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://" + host + ":" + port);
        return Redisson.create(config);
    }
}


利用Redisson分布式锁解决超卖问题,修改代码如下:


@RestController
public class MyController {
    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @Autowired
    RedissonClient redisson;
    @RequestMapping("/buy/{id}")
    public String buy(@PathVariable("id") Long id){
        String message="error";
        String lock_key="product_lock_"+id;
        String key="product_" + id;
        RLock lock = redisson.getLock(lock_key);
        //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
        try {
                lock.lock();
                int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
                if (count > 0) {
                    stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
                    System.out.println(key + "商品购买成功,剩余库存" + count);
                    message="success";
                }
        }catch (Throwable e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        if(message.equals("error"))
        System.out.println(key+"商品库存不足");
        return message;
    }
}


相关文章
|
8月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2243 7
|
9月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
707 3
|
缓存 NoSQL 架构师
Redis批量查询的四种技巧,应对高并发场景的利器!
在高并发场景下,巧妙地利用缓存批量查询技巧能够显著提高系统性能。 在笔者看来,熟练掌握细粒度的缓存使用是每位架构师必备的技能。因此,在本文中,我们将深入探讨 Redis 中批量查询的一些技巧,希望能够给你带来一些启发。
Redis批量查询的四种技巧,应对高并发场景的利器!
|
存储 缓存 NoSQL
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
372 1
|
缓存 NoSQL 关系型数据库
云端问道21期实操教学-应对高并发,利用云数据库 Tair(兼容 Redis®)缓存实现极速响应
本文介绍了如何通过云端问道21期实操教学,利用云数据库 Tair(兼容 Redis®)缓存实现高并发场景下的极速响应。主要内容分为四部分:方案概览、部署准备、一键部署和完成及清理。方案概览中,展示了如何使用 Redis 提升业务性能,降低响应时间;部署准备介绍了账号注册与充值步骤;一键部署详细讲解了创建 ECS、RDS 和 Redis 实例的过程;最后,通过对比测试验证了 Redis 缓存的有效性,并指导用户清理资源以避免额外费用。
288 1
|
缓存 NoSQL Redis
Redis高并发和高可用
Redis高并发和高可用
184 0
|
监控 NoSQL Redis
怎么保证Redis的高并发高可用
怎么保证Redis的高并发高可用
237 0
|
缓存 监控 NoSQL
|
缓存 监控 NoSQL
关于Redis的几件小事 | 高并发和高可用
关于Redis的几件小事 | 高并发和高可用如果你用redis缓存技术的话,肯定要考虑如何用redis来加多台机器,保证redis是高并发的,还有就是如何让Redis保证自己不是挂掉以后就直接死掉了。
1227 0
|
9月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?

热门文章

最新文章