Redis实现分布式锁

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis实现分布式锁

在之前并发系列的文章中,我们介绍了JVM中的锁。但是无论是synchronized还是Lock,都运行在线程级别上,必须运行在同一个JVM中。如果竞争资源的进程不在同一个JVM中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问。

分布式锁的实现一般有三种方式,使用MySql数据库行锁,基于Redis的分布式锁,以及基于Zookeeper的分布式锁。本文中我们重点看一下Redis如何实现分布式锁。

首先,看一下用于实现分布式锁的两个Redis基础命令:

setnx key value

这里的setnx,是"set if Not eXists"的缩写,表示当指定的key值不存在时,为key设定值为value。如果key存在,则设定失败。

setex key timeout value

setex命令为指定的key设置值及其过期时间(以秒为单位)。如果key已经存在,setex命令将会替换旧的值。

基于这两个指令,我们能够实现:

  • 使用setnx 命令,保证同一时刻只有一个线程能够获取到锁
  • 使用setex 命令,保证锁会超期释放,从而不因一个线程长期占有一个锁而导致死锁。

这里将两个命令结合在一起使用的原因是,在正常情况下,如果只使用setnx 命令,使用完成后使用delete命令删除锁进行释放,不存在什么问题。但是如果获取分布式锁的线程在运行中挂掉了,那么锁将不被释放。如果使用setex 设置了过期时间,即使线程挂掉,也可以自动进行锁的释放。

image.png

接下来,我们基于Redis+Spring手写实现一个分布式锁。首先配置Jedis连接池:

@Configuration
public class Config {
    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMinIdle(1);
        jedisPoolConfig.setMaxWaitMillis(2000);
        jedisPoolConfig.setTestOnBorrow(true);
        jedisPoolConfig.setTestOnReturn(true);
        JedisPool jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
        return  jedisPool;
    }
}

实现RedisLock分布式锁:

public class RedisLock implements Lock {
    @Autowired
    JedisPool jedisPool;
    private static final String key = "lock";
    private ThreadLocal<String> threadLocal = new ThreadLocal<>();
    @Override
    public void lock() {
        boolean b = tryLock();
        if (b) {
            return;
        }
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (Exception e) {
            e.printStackTrace();
        }
        lock();//递归调用
    }
    @Override
    public boolean tryLock() {
        SetParams setParams = new SetParams();
        setParams.ex(10);
        setParams.nx();
        String s = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        String lock = resource.set(key, s, setParams);
        resource.close();
        if ("OK".equals(lock)) {
            threadLocal.set(s);
            return true;
        }
        return false;
    }
    //解锁判断锁是不是自己加的
    @Override
    public void unlock(){
        //调用lua脚本解锁
        String script="if redis.call(\"get\",KEYS[1]==ARGV[1] then\n"+
                "   return redis.call(\"del\",KEYS[1])\n"+
                "else\n"+
                "   return 0\n"+
                "end";
        Jedis resource = jedisPool.getResource();
        Object eval=resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));
        if (Integer.valueOf(eval.toString())==0){
            resource.close();
            throw new RuntimeException("解锁失败");
        }
        /*
        *不写成下面这种也是因为不是原子操作,和ex、nx相同
        String s = resource.get(key);
        if (threadLocal.get().equals(s)){
            resource.del(key);
        }
        */
        resource.close();
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
    @Override
    public Condition newCondition() {
        return null;
    }
}

简单对上面代码中需要注意的地方做一解释:

  • 加锁过程中,使用SetParams 同时设置nxex的值,保证原子操作
  • 通过ThreadLocal保存key对应的value,通过value来判断锁是否当前线程自己加的,避免线程错乱解锁
  • 释放锁的过程中,使用lua脚本进行删除,保证Redis在执行此脚本时不执行其他操作,从而保证操作的原子性

但是,这段手写的代码可能会存在一个问题,就是不能保证业务逻辑一定能被执行完成,因为设置了锁的过期时间可能导致过期。

image.png

基于上面存在的问题,我们可以使用Redisson分布式可重入锁。Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。

引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.7</version>
</dependency>

配置RedissonClient

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient= Redisson.create(config);
        return redissonClient;
    }
}

下面对常用方法进行测试。方法1:

 void lock();

测试接口:

@GetMapping("/lock")
public String test() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock();
    System.out.println(Thread.currentThread().getName()+" get redisson lock");
    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    lock.unlock();
    System.out.println(Thread.currentThread().getName()+ " release lock");
   return "locked";
}

进行测试,同时发送两个请求,redisson锁生效:

image.png

方法2:

void lock(long leaseTime, TimeUnit unit);

Redisson可以给lock()方法提供leaseTime参数来指定加锁的时间,超过这个时间后锁可以自动释放。测试接口:

@GetMapping("/lock2")
public String test2() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock(10,TimeUnit.SECONDS);
    System.out.println(Thread.currentThread().getName()+" get redisson lock");
    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+ " release lock");
    return "locked";
}

运行结果:

image.png

可以看出,在第一个线程还没有执行完成时,就释放了redisson锁,第二个线程进入后,两个线程可以同时执行被锁住的代码逻辑。这样可以实现无需调用unlock方法手动解锁。

方法3:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

tryLock方法会尝试加锁,最多等待waitTime秒,上锁以后过leaseTime秒自动解锁;如果没有等待时间,锁不住直接返回false

@GetMapping("/lock3")
public String test3() {
    RLock lock = redissonClient.getLock("lock");
    try {
        boolean res = lock.tryLock(5, 30, TimeUnit.SECONDS);
        if (res){
            try{
                System.out.println(Thread.currentThread().getName()+" 获取到锁,返回true");
                System.out.println("do something");
                TimeUnit.SECONDS.sleep(20);
            }finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName()+" 释放锁");
            }
        }else {
            System.out.println(Thread.currentThread().getName()+" 未获取到锁,返回false");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "lock";
}

运行结果:

image.png

可见在第一个线程获得锁后,第二个线程超过等待时间仍未获得锁,返回false放弃获得锁的过程。

除了以上单机Redisson锁以外,还支持我们之前提到过的哨兵模式和集群模式,只需要改变Config的配置即可。以集群模式为例:

@Bean
public RedissonClient redissonClient(){
    Config config=new Config();
    config.useClusterServers().addNodeAddress("redis://172.20.5.170:7000")
        .addNodeAddress("redis://172.20.5.170:7001")
        .addNodeAddress("redis://172.20.5.170:7002")
        .addNodeAddress("redis://172.20.5.170:7003")
        .addNodeAddress("redis://172.20.5.170:7004")
        .addNodeAddress("redis://172.20.5.170:7005");
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
}

image.png

下面介绍一下Redisson红锁RedissonRedLock,该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RedissonRedLock针对的多个Redis节点,这多个节点可以是集群,也可以不是集群。当我们使用RedissonRedLock时,只要在大部分节点上加锁成功就算成功。看一下使用:

@GetMapping("/testRedLock")
public void testRedLock() {
    Config config1 = new Config();
    config1.useSingleServer().setAddress("redis://172.20.5.170:6379");
    RedissonClient redissonClient1 = Redisson.create(config1);
    Config config2 = new Config();
    config2.useSingleServer().setAddress("redis://172.20.5.170:6380");
    RedissonClient redissonClient2 = Redisson.create(config2);
    Config config3 = new Config();
    config3.useSingleServer().setAddress("redis://172.20.5.170:6381");
    RedissonClient redissonClient3 = Redisson.create(config3);
    String resourceName = "REDLOCK";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    try {
        isLock = redLock.tryLock(5, 30, TimeUnit.SECONDS);
        if (isLock) {
            System.out.println("do something");
            TimeUnit.SECONDS.sleep(20);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

相对于单Redis节点来说,RedissonRedLock的优点在于防止了单节点故障造成整个服务停止运行的情况;并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。使用RedissonRedLock性能方面会比单节点Redis分布式锁差一些,但可用性比普通锁高很多。

参考资料:

https://github.com/redisson/redisson/wiki/

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
7天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
40 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
47 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
73 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
1月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
110 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
60 4