redis实现分布式锁

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: redis实现分布式锁

redis实现分布式锁

一,redis除了缓存,还可以用来干啥

分布式锁,分布式限流,session共享,延迟队列等


二,分布式场景

1,互联网秒杀

2,抢优惠券

3,接口幂等性校验


三,解决超卖问题

1,悲观锁,通过 for update 实现提前加锁,在锁住这条记录之后,别人则无法操作当前这行记录。不过在高并发的场景下,容易出现死锁的问题,会大大降低的mysql的性能

2,乐观锁,通过添加一个version版本实现

因此不可能在mysql上面进行操作,一般的话都会在redis上面进行操作,通过redis进行一个库存的预扣减。一个减库存的伪代码如下,通过redis实现库存的预扣减。

//获取库存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
  //每人限购一个
    int realStock = stock - 1;
    //更新库存
    stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
    System.out.println("扣减成功,剩余库存:" + realStock + "");
} else {
    System.out.println("扣减失败,库存不足");
}

如果是部署在一个分布式架构或者集群上面,上面的代码就会出现问题,即会在多个tomcat下。如果只是简单的设置synchronized 的同步代码块,只能保证一个jvm下面的线程同步,即只能保证一个tomcat 下面的线程安全。因此要保证分布式场景下面线程安全,因此需要使用到分布式锁。分布式锁主要有mysql,redis,zookeeper,这里主要分析一下redis的分布式锁的使用场景以及原理流程


setnx key value

他的底层是一个原子操作, 它可以在同一时间内完成设置值和设置过期时间这两个操作, 因此 SETEX 命令在储存缓存的时候非常实用。 因此可以解决上述的分布式锁的问题可以如下

//设置锁
String lockKey = "product_001";
#获取锁,并且设置超时时间,如果业务出现异常,可以在指定时间内释放锁
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "hat", 30, TimeUnit.SECONDS);
#释放锁-
stringRedisTemplate.delete(lockKey);

由于多个线程都去抢同一把锁,所以在高并发的场景下,假设第一个线程获取锁lockKey,设置了过期时间为30s,但是如果业务逻辑需要的时间大于30s,则redis会因为这个过期时间释放锁,那么第二个线程就会获取锁成功,假设业务逻辑需要45s,那么还需要当前线程还需要15s将任务全部执行完毕,但是第二个线程获取锁后进来了,假设也需要45s,那么在第二个执行15s时,第一个线程全部执行完,就会进行锁的释放删除,但是全部线程用的是同一把锁,那么此时删除的锁就是把第二个进来的线程的锁给删除了,那么第三个线程又会进来,一直循环下去,将别的线程的锁给误删。因此有可能一直造成锁失效的问题,如果在高并发的场景下,很难保证线程的执行顺序,因此有可能会出现很大的线程安全问题。


通过上述分析,也就是说所有人都共用同一把锁,因此可以在释放锁的时候进行改进,为每个线程给予一个唯一的id进行客户端的标识,然后在进行锁删除的时候,需要通过这个唯一id进行锁的删除释放,在删除锁时需要进行这个客户端id的判断,在不出现异常的情况下,当前线程可以走完全部的业务逻辑,解决了上面锁被误删的情况,伪代码如下:

//将这个uuid作为客户端id,实现客户端的唯一标识
String clientId = UUID.randomUUID().toString();
//value则设置为这个uuid
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
//锁删除时进行判断,是否删除当前线程的锁
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
    //释放锁,如果在这行代码出现宕机,也会死锁几秒
    stringRedisTemplate.delete(lockKey);
}

在释放锁的时候,也可能出现宕机的问题或者网络延迟的问题,假设代码执行到删除锁的那行,出现服务器卡的界面,那么也可能在删除这个锁的时候会将下一个线程的锁给删除,虽然概率比较小,因此也可能出现并发安全的问题。因此需要使用接下来的redisson的方式实现,通过锁续命的方式实现这个分布式锁的安全问题。


四,redisson

4.1,需要的依赖包如下

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.6.5</version>
</dependency>

4.2,初始化redisson

@Bean
    public Redisson redisson() {
        // 此为单机模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://124.222.199.186:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

4.3,redisson使用的基本伪代码如下

//设置一个key作为锁的标志,如果key存在则表示资源被锁
String lockKey = "product_001";
//获取redis的分布式锁
RLock redissonLock = redisson.getLock(lockKey);
// 加锁,实现锁续命功能,锁时间大概在30s左右,该锁也是一个重入锁
//因此加了多少个锁,就需要解多少个锁
redissonLock.lock();
redissonLock.lock();
//解锁
redissonLock.unlock();
redissonLock.unlock();

4.4,lock方法的底层实现以及流程

//如果锁不可用,那么当前线程将被禁用,并处于休眠状态,直到获得锁。

//如果锁不可用,那么当前线程将被禁用,并处于休眠状态,直到获得锁。
void lock();

e98ff23ec277424a8b7cb0fd23bc2180.png


1,在多个线程的情况下,会有其中的一个线程获取到锁,获取锁的线程会进行加锁操作,通过主线程执行,分线程续命方式,会后台开启一个守护进程,该线程被称为watch dog,就是看门狗的意思,每隔10s检查当前线程的锁是否过期,当前线程还是否持有锁,如果持有锁的话则延长锁的时间,最后释放锁

2,如果获取锁的进程出现宕机的情况,并不会出现死锁的情况,会设置一个30s的过期时间,30s后自动释放锁

3,当持有锁的主线程宕机之后,主线程对应的守护线程也会随着被销毁,锁也就会自动释放

4,其他线程也会通过while循环,即自旋的方式获取锁,直到加锁成功

5,redis服务端会记录当前是哪个线程进行了加锁,有与时重入锁,因此会通过一个计数器来记录加锁的次数,因此在解锁的时候,加了多少次 lock 锁,就需要进行多少次 unlock 的解锁

6,redis使用了大量的原子操作,但是redis操作只保证自己的原子性,因此运用了大量的原子操作,通过lua脚本来实现redis批量操作的原子性。


4.5,redis Lua脚本

1,redis Lua脚本的优点

1,减少网络开销,即通过一次请求完成多次命令的操作

2,原子操作,可以让redis的批量操作实现原子性

3,替代redis的事务功能,支持常规事务,以及回滚操作

2,举例

//初始化商品10016的库存
jedis.set("product_stock_10016", "15");  
        //定义一个本地变量count,redis.call函数实现redis命令
String script = " local count = redis.call('get', KEYS[1]) " + 
          //转成数字类型
        " local a = tonumber(count) " +
          //ARGV[1] 对应的value
                " local b = tonumber(ARGV[1]) " +
          //判断如果a>b,则扣减库存,否则语法报错
                " if a >= b then " +
                " redis.call('set', KEYS[1], count-b) " +
                //模拟语法报错回滚操作"   bb == 0 " +
                "   return 1 " +
                " end " +
                " return 0 ";
//执行这段脚本
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
System.out.println(obj);
#因为redis是单线程,因此如果lua脚本尽量不要写的太复杂,否则可能造成后面的任务不执行

redisson 底层通过lua脚本实现redis分布式锁的原理如下。

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                       #hash存储对象
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

五,最终优化后的代码如下

private Redisson redisson;
private StringRedisTemplate stringRedisTemplate;
public String deductStock() throws InterruptedException {
        //获取锁
        String lockKey = "product_001";
      //给每一个线程一个唯一id标识
        //String clientId = UUID.randomUUID().toString();
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            //原生的方式获取锁  jedis.setnx(key,value)
            //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhenghuisheng");
            //设置超时时间
            //stringRedisTemplate.expire(lockKey,30, TimeUnit.SECONDS);
            //上面两条命令的组合
            /*Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
            if (!result) {
                return "error_code";
            }*/
            // 加锁,实现锁续命功能
            redissonLock.lock();
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock + "");
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }finally {
            redissonLock.unlock();
            /*if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
                    //释放锁,如果在这行代码出现宕机,也会死锁几秒
                    stringRedisTemplate.delete(lockKey);
            }*/
        }
        return "end";
    }

这样的话,就能保证在高并发场景下解决线程安全问题和超卖问题了。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
15天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
52 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
61 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
50 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
80 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
56 3
|
1月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
114 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
65 4
下一篇
无影云桌面