剖析分布式锁

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 我们不生产代码,我们是代码的搬运工前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下

我们不生产代码,我们是代码的搬运工

前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下

天才是1%的灵感,加上99%的汗水;编程是1%的编码,加上99%的在Google/StackOverflow/Github上找代码 残酷的现实是,找来的代码可能深藏bug,而不知

image.png

在多核多线程环境中,通过锁机制,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性

怎么样才是把好锁?

可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。 这把锁要是一把可重入锁(避免死锁) 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用的获取锁和释放锁功能 获取锁和释放锁的性能要好

分布式锁三要素

  1. 外部存储
    分布式锁是在分布式部署环境中给多个主机提供锁服务,需要另外的存储载体
  2. 全局唯一标识
    在多线程环境中,锁可以使一个对象引用,也可以是变量,都有唯一的标识来区分锁保护的不同资源; 在分布式环境下,也需要,比如对某一特定用户资源操作,业务+userId即可唯一标识
  3. 至少有两种状态,获取和释放
    锁至少需要两种状态:加锁(lock)和解锁(unlock)。 用状态区分当前尝试获取的锁是否已经被其他操作占用, 被占用只有等待锁释放后才能尝试获取锁并加锁,保护共享资源

实现

理论知识知道得再多,还得落地才行;只要遵从三要素,就能打造一把好锁,不要拘泥于某一种工具。

网上有很多实现方式,主要是”外部存储“使用了不同的组件,比如数据库,redis,zk,由于这些组件各自特性的不同,实现复杂度各有不同

这儿主要说下在实际工作中使用到的两种方式,数据库与redis

数据库

数据库,任何系统都需要的组件,常规手法,都是使用version来实现乐观锁

version

image.png

比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题

假设数据库中帐户信息表中有一个version字段,当前值为1;而当前帐户余额字段(balance)为1000元。假设操作员A先更新完,操作员B后更新。 a、操作员A此时将其读出(version=1),并从其帐户余额中增加100(1000+100=1100)。 b、在操作员A操作的过程中,操作员B也读入此用户信息(version=1),并从其帐户余额中扣除50(1000-50=950)。 c、操作员A完成了修改工作,将数据版本号加一(version=2),连同帐户增加后余额(balance=1100),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录version更新为2。 d、操作员B完成了操作,也将版本号加一(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足 “提交版本必须大于记录当前版本才能执行更新 “的乐观锁策略,因此,操作员B的提交被驳回。 这样,就避免了操作员B用基于version=1的旧数据修改的结果覆盖操作员A的操作结果的可能。

set balance=1100,version=version+1 where id=#{id} and version=#{version};

version简单,除了对业务数据表有侵入性,还有一些场景是胜任不了

比如,在操作一个数量之前,需要确认一下能不能操作

int countLimit = select count from limit where id = ${id};
if(countlimit>0){
    set balance=1100,version=version+1 where id=#{id} and version=#{version};
}
update count;

这儿操作了多张表,此时就需要再配合事务,才能保证原子性

redis

由于db性能的限制,而redis性能卓越,很多时候会选择redis实现方式

怎么使用redis正确地实现分布式锁,需要了解两方面

  1. 实现分布式锁时,使用到的redis命令
  2. 网上示例可能都有毒

redis命令

setnx 命令(『SET if Not eXists』(如果不存在,则 SET)的简写): 设置成功,返回 1 设置失败,返回 0 该命令是原子操作

getset 命令: 自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。 返回值:返回之前的旧值,如果之前Key不存在将返回nil。 该命令是原子操作。

get 命令: get获取key的值,如果存在,则返回;如果不存在,则返回nil;

del 命令: del删除key及key对应的值,如果key不存在,程序忽略

SET 命令: set key value [EX seconds] [PX milliseconds] [NX|XX] 将字符串值 value 关联到 key  如果 key 已经持有其他值, SET 就覆写旧值,无视类型。 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

可选参数从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value

PX millisecond:设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value

XX:只在键已经存在时,才对键进行设置操作。

示例

原来项目中使用分布式锁,整个逻辑:

  1. setnx(lockkey, 当前时间+过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转向 2。
  2. get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 3。
  3. 计算 newExpireTime = 当前时间+过期超时时间,然后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前 getset 设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  4. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

获取锁

private boolean acquireLock(Jedis j,String lock) throws Exception{
    int timeOut = timeoutSeconds*1000;
    boolean acquired = false;
    long start = System.currentTimeMillis();
    int times = 0;
    do {
        String value = String.valueOf(System.currentTimeMillis() + timeOut + 1);
        // 第一个得到这个锁
        if (j.setnx(lock, value) == 1) {
            logger.info("第一次获取全局锁:{} 成功", lock);
            acquired = true;
            break;
        }
        // j.expire(lock, timeoutSeconds); 网络抖动,可能失败
        String currentValue = j.get(lock);
        // 小于时,可能是上次没有清除,自上次超时后没有别的线程操作过
        if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {
            // 这是同步操作,只会一个成功
            String oldValue = j.getSet(lock, value);
            // 别的线程没有赋上值,当前成功得到锁
            if (oldValue != null && oldValue.equals(currentValue)) {
                acquired = true;
                logger.info("获取全局锁:{} 成功,尝试了{}次,经过了{}ms",lock,times,System.currentTimeMillis()-start);
                break;
            }
        }
        times++;
        Thread.sleep(100);
    } while (start + timeOut > System.currentTimeMillis());
    if(!acquired){
        logger.info("获取全局锁:{} 失败,尝试了{}次",lock,times);
    }            
    return acquired;
}

解锁

private void releaseLock(Jedis j,String lock){
    String currentValue = j.get(lock);
    if(currentValue != null){
        if(System.currentTimeMillis() < Long.valueOf(currentValue) ){
            j.del(lock);
            logger.info("释放锁{}",lock);
        }
    }
}

示例缺陷

特地从多年前的项目中把这段代码找出来,当年写完,心里还挺美

网上有很多资料也是差不多样的,但事实并不那么完美,甚至是错误的

加锁
  • 使用jedis.setnx()和jedis.expire()组合实现加锁
Long result = jedis.setnx(lockKey, value); 
 if (result == 1) { 
     // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁 
     jedis.expire(lockKey, expireTime); 
 }

这个问题很明显,setnx与expire不是同一个事务,不俱备原子性;程序崩溃或者网络抖动都会出现死锁问题

  • System.currentTimeMillis()这个需要各个client时间必须一致,一旦不一致,就可能加锁失败
  • getSet()如果锁为了灵活性,会把timeout作为入参

当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖

解锁
  • jedis.del()直接删除

这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

有种错误改进,增加参数传入requestId

public static void releaseLock(Jedis jedis, String lockKey, String requestId) { 
    // 判断加锁与解锁是不是同一个客户端 
    if (requestId.equals(jedis.get(lockKey))) { 
        // 若在此时,这把锁突然不是这个客户端的,则会误解锁 
        jedis.del(lockKey);
    } 
}

还是原子性的问题如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了

缺陷总结

心里认为本来很简单的事,代码大概:

Lock lock = DistributedReentrantLock.newLock("testlock11");//定义testlock11为key的锁,默认可重入锁
if(lock.tryLock()){
    try{
     xxxxxx
    }finally{
      lock.unlock(); //释放testlock11为key的锁,释放需要放在finally里,防止出异常导致锁没有及时释放
    }
  }

为了提高性能,通过redis原子性接口SETNX:

  1. 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间释放锁
  3. 使用DEL命令将锁数据删除

结果为了弥补setnx()与expire()两个接口的原子性问题,引入了一堆问题,外强中干

缺陷修正

加锁

Redis 2.6.12版本后,增强了set()命令

/**
 * 尝试获取分布式锁
 * @param jedis Redis客户端
 * @param lockKey 锁
 * @param requestId 请求标识
 * @param expireTime 超期时间
 * @return 是否获取成功
 */
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
    String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time), 这个set()方法一共有五个入参:

  1. 第一个为key,我们使用key来当锁,因为key是唯一的
  2. 第二个为value,我们传的是requestId,通过给value赋值为requestId,就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成
  3. 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  4. 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定
  5. 第五个为time,与第四个参数相呼应,代表key的过期时间

高可用:

  1. set()加入了NX参数,可以保证如果已有key存在,则不会调用成功,也就是只有一个客户端能持有锁,满足互斥性
  2. 由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁
  3. 将value赋值为requestId,代表加锁的客户端请求标识,那么在解锁的时候就可以进行校验是否是同一个客户端,防止锁交叉
解锁
/**
 * 释放分布式锁
 * @param jedis Redis客户端
 * @param lockKey 锁
 * @param requestId 请求标识
 * @return 是否释放成功
 */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)

使用eval()配置lua保证原子性

在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令

有效时间

为什么需要一个有效时间呢?主要就是防止死锁

疑难

  • 执行业务代码操作共享资源的时间大于设置锁的过期时间?

客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是10s,那么接口超时大概设置5-50ms

【虽然能解决问题,但时间设置成了难点,微服务中多少接口,而且接口的timeout都是可配置的,不能每次调整接口timeout时,还是考虑一下锁的timeout】

  • GC的STW
    image.png

客户端1获得了锁,正准备处理共享资源的时候,发生了Full GC直到锁过期。这样,客户端2又获得了锁,开始处理共享资源。在客户端2处理的时候,客户端1 Full GC完成,也开始处理共享资源,这样就出现了2个客户端都在处理共享资源的情况

续命丸

引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以3min间隔周期进行锁续约

这样如果应用重启了,最多3min等待时间,不会因为时间太长导致的死锁问题,也不会因为时间太短导致被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

Redission

虾总给了总结性阐述:

首先启动Daemon线程,一直循环检测所有的分布式key,异步递延分布锁的过期时间,只要在处理业务逻辑,就递延分布锁过期时间3min。 每次添加分布式锁key,同时会生成一个uuid token,定义一个ConcurrentHashMap构造一个全局map维护所有的分布式key,上面Daemon线程会遍历这个map,每次解锁需要比对这个token,token一致才能解锁。 这样以来如果应用重启了,最多会有3min等待时间,不会导致时间太长导致的死锁问题,也不会因为时间太短导致的被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

跟随虾总思路,找到了一个开源组件:Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

相对于平时使用的jedis,redission进行比较高的抽象

redission中的lock主要是RLock接口,继承的juc的Lock接口

public interface RLock extends Lock, RExpirable, RLockAsync

image.png

Lock

先看lock(),有两种形式,一个不带leaseTime,一个带leaseTime

public void lock() ;
public void lock(long leaseTime, TimeUnit unit) ;

边看源码,边解释

两个方法共用了lockInterruptibly()

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }
            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}
  1. 尝试获取锁tryAcquire
  2. 获取失败,订阅此channel的消息(订阅的意义,在解锁时就会发现)
  3. 进入循环,不停的尝试获取锁,其中使用了JUC的Semaphore
  4. 一旦获取成功,则跳出循环
  5. 取消订阅

尝试获取锁tryAcquire里面会用到两个核心方法tryAcquireAsync(),tryLockInnerAsync()

  1. @Override
1. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
2.     if (leaseTime != -1) {
3.         return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
4.     }
5.     RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
6.     ttlRemainingFuture.addListener(new FutureListener<Long>() {
7.         @Override
8.         public void operationComplete(Future<Long> future) throws Exception {
9.             if (!future.isSuccess()) {
10.                 return;
11.             }
12. 
13.             Long ttlRemaining = future.getNow();
14.             // lock acquired
15.             if (ttlRemaining == null) {
16.                 scheduleExpirationRenewal(threadId);
17.             }
18.         }
19.     });
20.     return ttlRemainingFuture;
21. }
• 1.根据锁的持续时间不同,处理也不同
• 2.没有设置持续时间,那就是阻塞型,一直等待
• 2.1.为了防止业务方法执行时间超过锁timeout,则定时续约scheduleExpirationRenewal()
• 3.设置了持续时间,则不需要进行续约
23. private void scheduleExpirationRenewal(final long threadId) {
24.     if (expirationRenewalMap.containsKey(getEntryName())) {
25.         return;
26.     }
27.     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
28.         @Override
29.         public void run(Timeout timeout) throws Exception {
30. 
31.             RFuture<Boolean> future = renewExpirationAsync(threadId);
32. 
33.             future.addListener(new FutureListener<Boolean>() {
34.                 @Override
35.                 public void operationComplete(Future<Boolean> future) throws Exception {
36.                     expirationRenewalMap.remove(getEntryName());
37.                     if (!future.isSuccess()) {
38.                         log.error("Can't update lock " + getName() + " expiration", future.cause());
39.                         return;
40.                     }
41. 
42.                     if (future.getNow()) {
43.                         // reschedule itself
44.                         scheduleExpirationRenewal(threadId);
45.                     }
46.                 }
47.             });
48.         }
49. 
50.     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
51. 
52.     if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
53.         task.cancel();
54.     }
55. }
56. 
57. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
58.     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
59.             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
60.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
61.                 "return 1; " +
62.             "end; " +
63.             "return 0;",
64.         Collections.<Object>singletonList(getName()), 
65.         internalLockLeaseTime, getLockName(threadId));
66. }
67. 以internalLockLeaseTime/3间隔时间,定时续约
68. 如果当前client自身有并发时,通过putIfAbsent保证只有一个task
69. 续约:当lock存在时,使用pexpire设置过期时间
70.  RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
71.     internalLockLeaseTime = unit.toMillis(leaseTime);
72. 
73.     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
74.               "if (redis.call('exists', KEYS[1]) == 0) then " +
75.                   "redis.call('hset', KEYS[1], ARGV[2], 1); " +
76.                   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
77.                   "return nil; " +
78.               "end; " +
79.               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
80.                   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
81.                   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
82.                   "return nil; " +
83.               "end; " +
84.               "return redis.call('pttl', KEYS[1]);",
85.                 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
86. }
87. 
88. protected String getLockName(long threadId) { 
89.     return id + ":" + threadId;
90. }
    • 1.lockname不存在
    • 1.1.hset(lockname,uuid+threadid,1),value=uuid+threadid,有uuid可以区分各个client,有threadid区分各个线程,这样锁就具备了可重入性
    • 1.2.pexpire设置过期时间,防止client挂掉,造成死锁
    • 2.lockname存在
    • 2.1.hexists(lockname,uuid+threadid),这样保证了是同一个锁在同一个client
    • 2.2.hincrby 再次进锁,计数器+1
    • 2.3.pexpire 再次设置超时
    • 3.lockname存在,并且不在同一client
    • 3.1.pttl 返回剩余有效时长

unLock
1. @Override
2. public RFuture<Void> unlockAsync(final long threadId) {
3.     final RPromise<Void> result = new RedissonPromise<Void>();
4.     RFuture<Boolean> future = unlockInnerAsync(threadId);
5. 
6.     future.addListener(new FutureListener<Boolean>() {
7.         @Override
8.         public void operationComplete(Future<Boolean> future) throws Exception {
9.             if (!future.isSuccess()) {
10.                 cancelExpirationRenewal(threadId);
11.                 result.tryFailure(future.cause());
12.                 return;
13.             }
14. 
15.             Boolean opStatus = future.getNow();
16.             if (opStatus == null) {
17.                 IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
18.                         + id + " thread-id: " + threadId);
19.                 result.tryFailure(cause);
20.                 return;
21.             }
22.             if (opStatus) {
23.                 cancelExpirationRenewal(null);
24.             }
25.             result.trySuccess(null);
26.         }
27.     });
28. 
29.     return result;
30. }
31. 
32. void cancelExpirationRenewal(Long threadId) {
33.     ExpirationEntry task = expirationRenewalMap.get(getEntryName());
34.     if (task != null && (threadId == null || task.getThreadId() == threadId)) {
35.         expirationRenewalMap.remove(getEntryName());
36.         task.getTimeout().cancel();
37.     }
38. }
39. 从方法名看,虽然对外好像是直接解锁,但内部是异步执行的
40. unlockInnerAsync()进行解锁
41. 从expirationRenewalMap移除,并把task.cancel()
42. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
43.     return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
44.             "if (redis.call('exists', KEYS[1]) == 0) then " +
45.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
46.                 "return 1; " +
47.             "end;" +
48.             "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
49.                 "return nil;" +
50.             "end; " +
51.             "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
52.             "if (counter > 0) then " +
53.                 "redis.call('pexpire', KEYS[1], ARGV[2]); " +
54.                 "return 0; " +
55.             "else " +
56.                 "redis.call('del', KEYS[1]); " +
57.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
58.                 "return 1; "+
59.             "end; " +
60.             "return nil;",
61.             Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
62. 
63. }
64. lockname不存在,说明已经解锁,publish channelname unlockmessage;return 1
65. lockname存在,但对于uuid+id不存在,说明不是加锁的client,return nil
66. lockname存在,并且是当前加锁client
67. 对lockname uuid+id进行-1,如果counter>0则走5,如果=0 则走6
68. counter>0 说明锁重入了,计数器-1,并expire
69. counter=0 说明最终解锁,直接del key,并publish channelname unlockmessage;return 1

redission缺陷
使用cluster时
一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。
redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错
redlock实现会更复杂,但从他的算法上看,有zk选举的味道。对于更高可用分布锁,可以借助zk本身特性去实现

    总结

    对于锁,主要考虑性能与安全,即要保持锁的活跃性,又得保证锁的安全性

    分布式锁,除了以上两点,还要考虑实现时的三要素

    对于redission,对于锁部分的源码,还有很多的内容,很多的细节需要挖掘,此篇就不写了,太长。

    后面再结合JUC,写篇更详细的源码分析

    参考资料

    Redis分布式锁的正确实现方式

    redission

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
NoSQL 算法 Java
分布式锁那点事
分布式锁那点事
24 1
|
3月前
|
存储 NoSQL 关系型数据库
分布式锁实现
分布式锁实现
21 0
初识Redission分布式锁
在微服务系统中,某些场景需要阻塞所有节点的所有线程,对共享资源的访问。比如并发时“超卖”和“余额减为负数”等情况,需要对同一资源进行加锁,这些就需要进行分布式。
初识Redission分布式锁
|
2月前
分布式锁 使用注意点
分布式锁 使用注意点
32 2
|
3月前
|
缓存 分布式计算 NoSQL
分布式锁是什么
分布式锁是什么
31 0
|
4月前
|
NoSQL Cloud Native 中间件
什么是分布式锁?他解决了什么样的问题?
什么是分布式锁?他解决了什么样的问题?
|
7月前
|
Java Maven
Redission 实现分布式锁
Redission 实现分布式锁
115 1
|
8月前
|
缓存 NoSQL 安全
浅谈分布式锁
浅谈分布式锁
60 0
|
10月前
|
存储 NoSQL 算法
这样实现分布式锁,才叫优雅!
这样实现分布式锁,才叫优雅!
69 0
|
10月前
|
存储 NoSQL 算法
如何优雅的实现分布式锁
如何优雅的实现分布式锁
74 0