redis集群+JedisCluster+lua脚本实现分布式锁

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: redis集群+JedisCluster+lua脚本实现分布式锁


  1. 依赖包引入
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
  1. 配置信息
#redis集群连接配置
spring.redis.cluster.nodes=192.168.0.15:6379,192.168.0.15:6380,192.168.0.15:6381,192.168.0.15:6382,192.168.0.15:6383,192.168.0.15:6384
#redis
spring.redis.cluster.max-redirects=6
spring.redis.jedis.pool.max-active=80
spring.redis.jedis.pool.max-idle=30
spring.redis.jedis.pool.max-wait=2000s
spring.redis.jedis.pool.min-idle=10
  1. JedisCluster配置
/**
 * @author m7
 * @date 2018年12月19日
 **/
@Configuration
public class RedisDistributeLockConfig {
    @Value("${spring.redis.cluster.nodes}")
    String redisNodes;
    @Bean
    //定义分布式锁对象
    public RedisDistributeLock redisDistributeLock(JedisCluster jedisCluster){
        return new RedisDistributeLock(jedisCluster);
    }
    @Bean
    //定义JedisCluster操作bean
    public JedisCluster jedisCluster(){
        return new JedisCluster(pharseHostAnport());
    }
    private Set<HostAndPort> pharseHostAnport(){
        if (StringUtils.isEmpty(redisNodes)){
            throw new RuntimeException("redis nodes can't be null or empty");
        }
        String[] hps = redisNodes.split(",");
        Set<HostAndPort> hostAndPorts = new HashSet<>();
        for (String hp : hps) {
            String[] hap = hp.split(":");
            hostAndPorts.add(new HostAndPort(hap[0], Integer.parseInt(hap[1])));
        }
        return hostAndPorts;
    }
}
  1. 分布式锁实现
/**
 * JedisCluster + lua脚本实现分布式锁
 * @author m7
 * @date 2018年12月19日
 **/
public class RedisDistributeLock {
    private Logger logger = LoggerFactory.getLogger(RedisDistributeLock.class);
    private JedisCluster jedisCluster;
    /**
     * lua脚本:判断锁住值是否为当前线程持有,是的话解锁,不是的话解锁失败
     */
    private static final String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if" +
            " redis.call('get', KEYS[1]) == ARGV[1]" +
            " then" +
            " return redis.call('del', KEYS[1])" +
            " else" +
            " return 0" +
            " end";
    private volatile String unlockSha1 = "";
    private static final Long UNLOCK_SUCCESS_CODE = 1L;
    private static final String LOCK_SUCCESS_CODE = "ok";
    public RedisDistributeLock(JedisCluster jedisCluster) {
        this.jedisCluster = jedisCluster;
    }
    /**
     * 根据loopTryTime循环重试
     * @param lockKey 锁key
     * @param lockVal 锁值,用于解锁校验
     * @param expiryTime 锁过期时间
     * @param loopTryTime 获取失败时,循环重试获取锁的时长
     * @return 是否获得锁
     */
    public boolean tryLock(String lockKey, String lockVal, long expiryTime, long loopTryTime){
        Long endTime = System.currentTimeMillis() + loopTryTime;
        while (System.currentTimeMillis() < endTime){
            if (tryLock(lockKey, lockVal, expiryTime)){
                return true;
            }
        }
        return false;
    }
    /**
     * 根据loopTryTime循环重试
     * @param lockKey 锁key
     * @param lockVal 锁值,用于解锁校验
     * @param expiryTime 锁过期时间
     * @param retryTimes 重试次数
     * @param setpTime 每次重试间隔 mills
     * @return 是否获得锁
     */
    public boolean tryLock(String lockKey, String lockVal, long expiryTime, int retryTimes, long setpTime){
        while (retryTimes > 0){
            if (tryLock(lockKey, lockVal, expiryTime)){
                return true;
            }
            retryTimes--;
            try {
                Thread.sleep(setpTime);
            } catch (InterruptedException e) {
                logger.error("get distribute lock error" +e.getLocalizedMessage());
            }
        }
        return false;
    }
    /**
     * 一次尝试,快速失败。不支持重入
     * @param lockKey 锁key
     * @param lockVal 锁值,用于解锁校验
     * @param expiryTime 锁过期时间 MILLS
     * @return 是否获得锁
     */
    public boolean tryLock(String lockKey, String lockVal, long expiryTime){
        //相比一般的分布式锁,这里把setNx和setExpiry操作合并到一起,jedis保证原子性,避免连个命令之间出现宕机等问题
        //这里也可以我们使用lua脚本实现
        String result = jedisCluster.set(lockKey, lockVal, "NX", "PX", expiryTime);
        return LOCK_SUCCESS_CODE.equalsIgnoreCase(result);
    }
    /**
     * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间
     * @param lockKey 锁key
     * @param lockVal 锁值
     * @return 是否释放成功
     */
    public boolean tryUnLock(String lockKey, String lockVal){
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        List<String> argv = new ArrayList<>();
        argv.add(lockVal);
        try {
            Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
            return UNLOCK_SUCCESS_CODE.equals(result);
        }catch (JedisNoScriptException e){
            //没有脚本缓存时,重新发送缓存
            logger.info("try to store script......");
            storeScript(lockKey);
            Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
            return UNLOCK_SUCCESS_CODE.equals(result);
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 由于使用redis集群,因此每个节点都需要各自缓存一份脚本数据
     * @param slotKey 用来定位对应的slot的slotKey
     */
    public void storeScript(String slotKey){
        if (StringUtils.isEmpty(unlockSha1) || !jedisCluster.scriptExists(unlockSha1, slotKey)){
            //redis支持脚本缓存,返回哈希码,后续可以继续用来调用脚本
            unlockSha1 = jedisCluster.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL, slotKey);
        }
    }
}

分析

  • 加锁操作
    比一般的redis分布式锁,这里操作jedis的操作方式进行加锁,好处就是Jedis保证set与设置有效期两个操作之间的原子性,避免在set值之后,程序宕机,导致没有设置过期时间,锁就一直被锁住。

这一步操作我们单独使用lua脚本实现也可以,但是幸好jedis已经帮我们进行实现。

/**
 * 一次尝试,快速失败。不支持重入
 * @param lockKey 锁key
 * @param lockVal 锁值,用于解锁校验
 * @param expiryTime 锁过期时间 MILLS
 * @return 是否获得锁
 */
public boolean tryLock(String lockKey, String lockVal, long expiryTime){
    //相比一般的分布式锁,这里把setNx和setExpiry操作合并到一起,jedis保证原子性,避免连个命令之间出现宕机等问题
    //这里也可以我们使用lua脚本实现
    //NX表示setNX操作,PX表示过期时间是mills
    String result = jedisCluster.set(lockKey, lockVal, "NX", "PX", expiryTime);
    return LOCK_SUCCESS_CODE.equalsIgnoreCase(result);
}

同时加锁操作也有几个简单的重载实现,分别是重试获取和循环获取锁的重载,根据业务场景适当调整使用。

  • 解锁操作
    这里的分布式锁的解锁操作使用lua脚本帮助实现。
    我们知道,分布式锁在解锁时一定需要验证是不是锁的持有者,这种情况下,我们需要进行的操作就有获取key的对应value,然后验证value的值,这个过程,存在一种情况,导致误删别的持有者的锁。分析如下的操作顺序图

    上面的操作顺序可能出错的情况就是当lock1尝试释放时,先获取值,判断是否是锁的持有者,如果是,就再发指令删除锁。这个过程可能存在问题就是,lock1在获取值之后,刚好到了有效期了,那么锁可能会在此时被锁竞争者2获得,并且设置锁lock2,然而这时锁竞争者1删除锁的指令刚好重新发送到redis-server,就会误删lock2,导致后续会被其他锁竞争者3获取,发送不可知业务错误。
    使用lua脚本的好处就是保证redis指令之间执行的原子性,把get和del执行放在脚本中,保证不会误删别的锁竞争者的锁,假如刚好出现get之后锁值过期,最多就是del操作结果为0,不会出现误删结果。
/**
     * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间
     * @param lockKey 锁key
     * @param lockVal 锁值
     * @return 是否释放成功
     */
    public boolean tryUnLock(String lockKey, String lockVal){
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        List<String> argv = new ArrayList<>();
        argv.add(lockVal);
        try {
            Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
            return UNLOCK_SUCCESS_CODE.equals(result);
        }catch (JedisNoScriptException e){
            //没有脚本缓存时,重新发送脚本并缓存
            logger.info("try to store script......");
            storeScript(lockKey);
            //重试获取
            Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
            return UNLOCK_SUCCESS_CODE.equals(result);
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
  • 解锁脚本
/**
 * lua脚本:判断锁住值是否为当前线程持有,是的话解锁,不是的话解锁失败
 */
private static final String DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL = "if" +
        " redis.call('get', KEYS[1]) == ARGV[1]" +
        " then" +
        " return redis.call('del', KEYS[1])" +
        " else" +
        " return 0" +
        " end";
  • lua脚本缓存
    在redis集群中,为了避免重复发送脚本数据浪费网络资源,可以使用script load命令进行脚本数据缓存,并且返回一个哈希码作为脚本的调用句柄,每次调用脚本只需要发送哈希码来调用即可。
127.0.0.1:6381> script load "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"
"e9f69f2beb755be68b5e456ee2ce9aadfbc4ebf4"
  • 上面是在redis-cli中缓存脚本的方式,在程序中,存储lua脚本的方式是如下所示。使用jedis可以很方便就完成脚本缓存,先判断脚本缓存是否存在,不存在就进行脚本数据缓存并且保存哈希码,以备接下来调用脚本。

注意:需要注意的是,在redis集群环境下,每个节点都需要进行一份脚本缓存,否则就会出现

NOSCRIPT No matching script. Please use EVAL. 
  • 错误,因此我在程序中加了处理。
/**
 * 由于使用redis集群,因此每个节点都需要各自缓存一份脚本数据
 * @param slotKey 用来定位对应的slot的slotKey
 */
public void storeScript(String slotKey){
    if (StringUtils.isEmpty(unlockSha1) || !jedisCluster.scriptExists(unlockSha1, slotKey)){
        //redis支持脚本缓存,返回哈希码,后续可以继续用来调用脚本
        unlockSha1 = jedisCluster.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL, slotKey);
    }
}
  • slotKey就是我们set值时的key,redis根据crc16函数 计算key应该对应哪一个slot,如果slot所在的redis节点没有缓存脚本数据就会报处NOSCRIPT No matching script. Please use EVAL.异常,因此当捕捉到这个异常时,我们在代码中重新发送脚本数据进行缓存即可。
/**
 * 释放分布式锁,释放失败最可能是业务执行时间长于lockKey过期时间,应当结合业务场景调整过期时间
 * @param lockKey 锁key
 * @param lockVal 锁值
 * @return 是否释放成功
 */
public boolean tryUnLock(String lockKey, String lockVal){
    List<String> keys = new ArrayList<>();
    keys.add(lockKey);
    List<String> argv = new ArrayList<>();
    argv.add(lockVal);
    try {
        Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
        return UNLOCK_SUCCESS_CODE.equals(result);
    }catch (JedisNoScriptException e){
        //没有脚本缓存时,重新发送脚本并缓存
        //根据lockkey计算slot,在对应redis节点重新缓存一份脚本数据
        logger.info("try to store script......");
        storeScript(lockKey);
        //重试获取
        Object result = jedisCluster.evalsha(unlockSha1, keys, argv);
        return UNLOCK_SUCCESS_CODE.equals(result);
    }catch (Exception e){
        e.printStackTrace();
        return false;
    }
}
  • 测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ActivityServiceApplication.class})
@Slf4j
public class ActivityServiceApplicationTests {
    @Resource
    private RedisDistributeLock redisDistributeLock;
    @Test
    public void testRedislock() throws InterruptedException {
        for(int i=0;i < 50;i++){
            int finalI = i;
            new Thread(() ->{
                    if (redisDistributeLock.tryLock("TEST_LOCK_KEY", "TEST_LOCK_VAL_"+ finalI, 1000* 100, 1000*20)){
                        try {
                            log.warn("get lock successfully with lock value:-----" + "TEST_LOCK_VAL_"+ finalI);
                            Thread.sleep(2000);
                            if (!redisDistributeLock.tryUnLock("TEST_LOCK_KEY", "TEST_LOCK_VAL_"+ finalI)){
                                throw new RuntimeException("release lock fail");
                            }
                            log.warn("release lock successfully with lock value:-----" + "TEST_LOCK_VAL_"+ finalI);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }else {
                        log.warn("get lock fail with lock value:-----" + "TEST_LOCK_VAL_"+ finalI);
                    }
            }).start();
        }
        Thread.sleep(1000*1000);
    }
}

设置50个线程尝试获取分布式锁,每个线程尝试时间为20秒;获取到锁的线程,sleep2秒,然后释放锁。

最终会出现,10个线程能够依次获得锁,40个线程获取锁超时失败。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
29天前
|
缓存 NoSQL Redis
Redis 脚本
10月更文挑战第18天
33 3
|
11天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
47 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
61 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
48 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
76 4
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
62 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
1月前
|
缓存 分布式计算 NoSQL
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
29 2
|
1月前
|
缓存 NoSQL 算法
面试题:Redis如何实现分布式锁!
面试题:Redis如何实现分布式锁!
下一篇
无影云桌面