redisson实现分布式锁原理

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

Redisson分布式锁

之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。

不同版本实现锁的机制并不相同

引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理。

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.2.3</version>
</dependency>

setnx需要配合getset以及事务来完成,这样才能比较好的避免死锁问题,而新版本由于支持lua脚本,可以避免使用事务以及操作多个redis命令,语义表达更加清晰一些。

RLock接口的特点

继承标准接口Lock

拥有标准锁接口的所有特性,比如lock,unlock,trylock等等。

扩展标准接口Lock

扩展了很多方法,常用的主要有:强制锁释放,带有效期的锁,还有一组异步的方法。其中前面两个方法主要是解决标准lock可能造成的死锁问题。比如某个线程获取到锁之后,线程所在机器死机,此时获取了锁的线程无法正常释放锁导致其余的等待锁的线程一直等待下去。

可重入机制

各版本实现有差异,可重入主要考虑的是性能,同一线程在未释放锁时如果再次申请锁资源不需要走申请流程,只需要将已经获取的锁继续返回并且记录上已经重入的次数即可,与jdk里面的ReentrantLock功能类似。重入次数靠hincrby命令来配合使用,详细的参数下面的代码。

怎么判断是同一线程?
redisson的方案是,RedissonLock实例的一个guid再加当前线程的id,通过getLockName返回

public class RedissonLock extends RedissonExpirable implements RLock {   
    final UUID id;    protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {        super(commandExecutor, name);        this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);        this.commandExecutor = commandExecutor;        this.id = id;
    }    String getLockName(long threadId) {        return this.id + ":" + threadId;
    }

RLock获取锁的两种场景

这里拿tryLock的源码来看:tryAcquire方法是申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请直接获取并返回,如果获取到时间,则进入等待竞争逻辑。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {        long time = unit.toMillis(waitTime);        long current = System.currentTimeMillis();        final long threadId = Thread.currentThread().getId();        Long ttl = this.tryAcquire(leaseTime, unit);        if(ttl == null) {            //直接获取到锁
            return true;
        } else {            //有竞争的后续看
        }
    }

无竞争,直接获取锁

先看下首先获取锁并释放锁背后的redis都在做什么,可以利用redis的monitor来在后台监控redis的执行情况。当我们在方法了增加@RequestLockable之后,其实就是调用lock以及unlock,下面是redis命令:

  • 加锁
    由于高版本的redis支持lua脚本,所以redisson也对其进行了支持,采用了脚本模式,不熟悉lua脚本的可以去查找下。执行lua命令的逻辑如下:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {        this.internalLockLeaseTime = unit.toMillis(leaseTime);        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});
    }

加锁的流程:

  1. 判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。

  2. 判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。

  3. 被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。

"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);"

 "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 
 "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"

上面的lua脚本会转换成真正的redis命令,下面的是经过lua脚本运算之后实际执行的redis命令。

1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"
  • 解锁
    解锁的流程看起来复杂些:

  1. 如果lock键不存在,发消息说锁已经可用

  2. 如果锁不是被当前线程锁定,则返回nil

  3. 由于支持可重入,在解锁时将重入次数需要减1

  4. 如果计算后的重入次数>0,则重新设置过期时间

  5. 如果计算后的重入次数<=0,则发消息说锁已经可用

"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;""2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"
 "0" "1000"
 "346e1eb8-5bfd-4d49-9870-042df402f248:21"

无竞争情况下解锁redis命令:
主要是发送一个解锁的消息,以此唤醒等待队列中的线程重新竞争锁。

1486642678.493691 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"1486642678.493712 [0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0"

有竞争,等待

有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。

  • this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败

  • this.await返回true,进入循环尝试获取锁。

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {        long time = unit.toMillis(waitTime);        long current = System.currentTimeMillis();        final long threadId = Thread.currentThread().getId();        Long ttl = this.tryAcquire(leaseTime, unit);        if(ttl == null) {            return true;
        } else {            //重点是这段
            time -= System.currentTimeMillis() - current;            if(time <= 0L) {                return false;
            } else {
                current = System.currentTimeMillis();                final RFuture subscribeFuture = this.subscribe(threadId);                if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {                    if(!subscribeFuture.cancel(false)) {
                        subscribeFuture.addListener(new FutureListener() {                            public void operationComplete(Future<RedissonLockEntry> future) throws Exception {                                if(subscribeFuture.isSuccess()) {                                    RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                                }

                            }
                        });
                    }                    return false;
                } else {                    boolean var16;                    try {
                        time -= System.currentTimeMillis() - current;                        if(time <= 0L) {                            boolean currentTime1 = false;                            return currentTime1;
                        }                        do {                            long currentTime = System.currentTimeMillis();
                            ttl = this.tryAcquire(leaseTime, unit);                            if(ttl == null) {
                                var16 = true;                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;                            if(time <= 0L) {
                                var16 = false;                                return var16;
                            }

                            currentTime = System.currentTimeMillis();                            if(ttl.longValue() >= 0L && ttl.longValue() < time) {                                this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
                            } else {                                this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        var16 = false;
                    } finally {                        this.unsubscribe(subscribeFuture, threadId);
                    }                    return var16;
                }
            }
        }
    }

循环尝试一般有如下几种方法:

  1. while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。

  2. Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。

  3. 基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。

redisson依赖

由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。



本文转自 sshpp 51CTO博客,原文链接:http://blog.51cto.com/12902932/1924602,如需转载请自行联系原作者

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
相关文章
|
4月前
|
NoSQL 调度 Redis
分布式锁—3.Redisson的公平锁
Redisson公平锁(RedissonFairLock)是一种基于Redis实现的分布式锁,确保多个线程按申请顺序获取锁,从而实现公平性。其核心机制是通过队列和有序集合管理线程的排队顺序。加锁时,线程会进入队列并等待,锁释放后,队列中的第一个线程优先获取锁。RedissonFairLock支持可重入加锁,即同一线程多次加锁不会阻塞。新旧版本在排队机制上有所不同,新版本在5分钟后才会重排队列,而旧版本在5秒后就会重排。释放锁时,Redisson会移除队列中等待超时的线程,并通知下一个排队的线程获取锁。通过这种机制,RedissonFairLock确保了锁的公平性和顺序性。
|
2月前
|
NoSQL Java Redis
基于Redisson和自定义注解的分布式锁实现策略。
在实现分布式锁时,保证各个组件配置恰当、异常处理充足、资源清理彻底是至关重要的。这样保障了在分布布局场景下,锁的正确性和高效性,使得系统的稳健性得到增强。通过这种方式,可以有效预防并发环境下的资源冲突问题。
160 29
|
NoSQL 安全 调度
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
Redisson 的 MultiLock 是一种分布式锁实现,支持对多个独立的 RLock 同时加锁或解锁。它通过“整锁整放”机制确保所有锁要么全部加锁成功,要么完全回滚,避免状态不一致。适用于跨多个 Redis 实例或节点的场景,如分布式任务调度。其核心逻辑基于遍历加锁列表,失败时自动释放已获取的锁,保证原子性。解锁时亦逐一操作,降低死锁风险。MultiLock 不依赖 Lua 脚本,而是封装多锁协调,满足高一致性需求的业务场景。
134 0
【📕分布式锁通关指南 10】源码剖析redisson之MultiLock的实现
|
4月前
|
NoSQL 调度 Redis
分布式锁—5.Redisson的读写锁
Redisson读写锁(RedissonReadWriteLock)是Redisson提供的一种分布式锁机制,支持读锁和写锁的互斥与并发控制。读锁允许多个线程同时获取,适用于读多写少的场景,而写锁则是独占锁,确保写操作的互斥性。Redisson通过Lua脚本实现锁的获取、释放和重入逻辑,并利用WatchDog机制自动续期锁的过期时间,防止锁因超时被误释放。 读锁的获取逻辑通过Lua脚本实现,支持读读不互斥,即多个线程可以同时获取读锁。写锁的获取逻辑则确保写写互斥和读写互斥,即同一时间只能有一个线程获取写锁,
273 17
|
5月前
|
负载均衡 NoSQL 算法
Redisson分布式锁数据一致性解决方案
通过以上的设计和实现, Redisson能够有效地解决分布式环境下数据一致性问题。但是, 任何技术都不可能万无一失, 在使用过程中还需要根据实际业务需求进行逻辑屏障的设计和错误处理机制的建立。
268 48
|
4月前
|
算法 NoSQL Redis
分布式锁—4.Redisson的联锁和红锁
Redisson的MultiLock和RedLock机制为分布式锁提供了强大的支持。MultiLock允许一次性锁定多个资源,确保在更新这些资源时不会被其他线程干扰。它通过将多个锁合并为一个大锁,统一进行加锁和释放操作。RedissonMultiLock的实现通过遍历所有锁并尝试加锁,若在超时时间内无法获取所有锁,则释放已获取的锁并重试。 RedLock算法则基于多个Redis节点的加锁机制,确保在大多数节点上加锁成功即可。RedissonRedLock通过重载MultiLock的failedLocksLi
236 10
|
4月前
|
NoSQL Java Redis
分布式锁—6.Redisson的同步器组件
Redisson提供了多种分布式同步工具,包括分布式锁、Semaphore和CountDownLatch。分布式锁包括可重入锁、公平锁、联锁、红锁和读写锁,适用于不同的并发控制场景。Semaphore允许多个线程同时获取锁,适用于资源池管理。CountDownLatch则用于线程间的同步,确保一组线程完成操作后再继续执行。Redisson通过Redis实现这些同步机制,提供了高可用性和高性能的分布式同步解决方案。源码剖析部分详细介绍了这些组件的初始化和操作流程,展示了Redisson如何利用Redis命令和
|
4月前
|
NoSQL 算法 安全
分布式锁—1.原理算法和使用建议
本文主要探讨了Redis分布式锁的八大问题,包括非原子操作、忘记释放锁、释放其他线程的锁、加锁失败处理、锁重入问题、锁竞争问题、锁超时失效及主从复制问题,并提供了相应的优化措施。接着分析了Redis的RedLock算法,讨论其优缺点以及分布式专家Martin对其的质疑。此外,文章对比了基于Redis和Zookeeper(zk)的分布式锁实现原理,包括获取与释放锁的具体流程。最后总结了两种分布式锁的适用场景及使用建议,指出Redis分布式锁虽有性能优势但模型不够健壮,而zk分布式锁更稳定但部署成本较高。实际应用中需根据业务需求权衡选择。
|
4月前
|
监控 NoSQL Java
分布式锁—2.Redisson的可重入锁
本文主要介绍了Redisson可重入锁RedissonLock概述、可重入锁源码之创建RedissonClient实例、可重入锁源码之lua脚本加锁逻辑、可重入锁源码之WatchDog维持加锁逻辑、可重入锁源码之可重入加锁逻辑、可重入锁源码之锁的互斥阻塞逻辑、可重入锁源码之释放锁逻辑、可重入锁源码之获取锁超时与锁超时自动释放逻辑、可重入锁源码总结。
|
6月前
|
安全
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁
Redisson 的看门狗机制是解决分布式锁续期问题的核心功能。当通过 `lock()` 方法加锁且未指定租约时间时,默认启用 30 秒的看门狗超时时间。其原理是在获取锁后创建一个定时任务,每隔 1/3 超时时间(默认 10 秒)通过 Lua 脚本检查锁状态并延长过期时间。续期操作异步执行,确保业务线程不被阻塞,同时仅当前持有锁的线程可成功续期。锁释放时自动清理看门狗任务,避免资源浪费。学习源码后需注意:避免使用带超时参数的加锁方法、控制业务执行时间、及时释放锁以优化性能。相比手动循环续期,Redisson 的定时任务方式更高效且安全。
358 24
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁

热门文章

最新文章