我们不生产代码,我们是代码的搬运工
前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下
天才是1%的灵感,加上99%的汗水;编程是1%的编码,加上99%的在Google/StackOverflow/Github上找代码 残酷的现实是,找来的代码可能深藏bug,而不知
锁
在多核多线程环境中,通过锁机制,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性
怎么样才是把好锁?
可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。 这把锁要是一把可重入锁(避免死锁) 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用的获取锁和释放锁功能 获取锁和释放锁的性能要好
分布式锁三要素
- 外部存储
分布式锁是在分布式部署环境中给多个主机提供锁服务,需要另外的存储载体 - 全局唯一标识
在多线程环境中,锁可以使一个对象引用,也可以是变量,都有唯一的标识来区分锁保护的不同资源; 在分布式环境下,也需要,比如对某一特定用户资源操作,业务+userId即可唯一标识 - 至少有两种状态,获取和释放
锁至少需要两种状态:加锁(lock)和解锁(unlock)。 用状态区分当前尝试获取的锁是否已经被其他操作占用, 被占用只有等待锁释放后才能尝试获取锁并加锁,保护共享资源
实现
理论知识知道得再多,还得落地才行;只要遵从三要素,就能打造一把好锁,不要拘泥于某一种工具。
网上有很多实现方式,主要是”外部存储“使用了不同的组件,比如数据库,redis,zk,由于这些组件各自特性的不同,实现复杂度各有不同
这儿主要说下在实际工作中使用到的两种方式,数据库与redis
数据库
数据库,任何系统都需要的组件,常规手法,都是使用version来实现乐观锁
version
比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题
假设数据库中帐户信息表中有一个version字段,当前值为1;而当前帐户余额字段(balance)为1000元。假设操作员A先更新完,操作员B后更新。 a、操作员A此时将其读出(version=1),并从其帐户余额中增加100(1000+100=1100)。 b、在操作员A操作的过程中,操作员B也读入此用户信息(version=1),并从其帐户余额中扣除50(1000-50=950)。 c、操作员A完成了修改工作,将数据版本号加一(version=2),连同帐户增加后余额(balance=1100),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录version更新为2。 d、操作员B完成了操作,也将版本号加一(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足 “提交版本必须大于记录当前版本才能执行更新 “的乐观锁策略,因此,操作员B的提交被驳回。 这样,就避免了操作员B用基于version=1的旧数据修改的结果覆盖操作员A的操作结果的可能。
set balance=1100,version=version+1 where id=#{id} and version=#{version};
version简单,除了对业务数据表有侵入性,还有一些场景是胜任不了
比如,在操作一个数量之前,需要确认一下能不能操作
int countLimit = select count from limit where id = ${id}; if(countlimit>0){ set balance=1100,version=version+1 where id=#{id} and version=#{version}; } update count;
这儿操作了多张表,此时就需要再配合事务,才能保证原子性
redis
由于db性能的限制,而redis性能卓越,很多时候会选择redis实现方式
怎么使用redis正确地实现分布式锁,需要了解两方面
- 实现分布式锁时,使用到的redis命令
- 网上示例可能都有毒
redis命令
setnx 命令(『SET if Not eXists』(如果不存在,则 SET)的简写): 设置成功,返回 1 设置失败,返回 0 该命令是原子操作
getset 命令: 自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。 返回值:返回之前的旧值,如果之前Key不存在将返回nil。 该命令是原子操作。
get 命令: get获取key的值,如果存在,则返回;如果不存在,则返回nil;
del 命令: del删除key及key对应的值,如果key不存在,程序忽略
SET 命令: set key value [EX seconds] [PX milliseconds] [NX|XX] 将字符串值 value 关联到 key 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
可选参数从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
PX millisecond:设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value
XX:只在键已经存在时,才对键进行设置操作。
示例
原来项目中使用分布式锁,整个逻辑:
- setnx(lockkey, 当前时间+过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转向 2。
- get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 3。
- 计算 newExpireTime = 当前时间+过期超时时间,然后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前 getset 设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
- 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
获取锁
private boolean acquireLock(Jedis j,String lock) throws Exception{ int timeOut = timeoutSeconds*1000; boolean acquired = false; long start = System.currentTimeMillis(); int times = 0; do { String value = String.valueOf(System.currentTimeMillis() + timeOut + 1); // 第一个得到这个锁 if (j.setnx(lock, value) == 1) { logger.info("第一次获取全局锁:{} 成功", lock); acquired = true; break; } // j.expire(lock, timeoutSeconds); 网络抖动,可能失败 String currentValue = j.get(lock); // 小于时,可能是上次没有清除,自上次超时后没有别的线程操作过 if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) { // 这是同步操作,只会一个成功 String oldValue = j.getSet(lock, value); // 别的线程没有赋上值,当前成功得到锁 if (oldValue != null && oldValue.equals(currentValue)) { acquired = true; logger.info("获取全局锁:{} 成功,尝试了{}次,经过了{}ms",lock,times,System.currentTimeMillis()-start); break; } } times++; Thread.sleep(100); } while (start + timeOut > System.currentTimeMillis()); if(!acquired){ logger.info("获取全局锁:{} 失败,尝试了{}次",lock,times); } return acquired; }
解锁
private void releaseLock(Jedis j,String lock){ String currentValue = j.get(lock); if(currentValue != null){ if(System.currentTimeMillis() < Long.valueOf(currentValue) ){ j.del(lock); logger.info("释放锁{}",lock); } } }
示例缺陷
特地从多年前的项目中把这段代码找出来,当年写完,心里还挺美
网上有很多资料也是差不多样的,但事实并不那么完美,甚至是错误的
加锁
- 使用jedis.setnx()和jedis.expire()组合实现加锁
Long result = jedis.setnx(lockKey, value); if (result == 1) { // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁 jedis.expire(lockKey, expireTime); }
这个问题很明显,setnx与expire不是同一个事务,不俱备原子性;程序崩溃或者网络抖动都会出现死锁问题
- System.currentTimeMillis()这个需要各个client时间必须一致,一旦不一致,就可能加锁失败
- getSet()如果锁为了灵活性,会把timeout作为入参
当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖
解锁
- jedis.del()直接删除
这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的
有种错误改进,增加参数传入requestId
public static void releaseLock(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是不是同一个客户端 if (requestId.equals(jedis.get(lockKey))) { // 若在此时,这把锁突然不是这个客户端的,则会误解锁 jedis.del(lockKey); } }
还是原子性的问题如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了
缺陷总结
心里认为本来很简单的事,代码大概:
Lock lock = DistributedReentrantLock.newLock("testlock11");//定义testlock11为key的锁,默认可重入锁 if(lock.tryLock()){ try{ xxxxxx }finally{ lock.unlock(); //释放testlock11为key的锁,释放需要放在finally里,防止出异常导致锁没有及时释放 } }
为了提高性能,通过redis原子性接口SETNX:
- 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
- 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间释放锁
- 使用DEL命令将锁数据删除
结果为了弥补setnx()与expire()两个接口的原子性问题,引入了一堆问题,外强中干
缺陷修正
加锁
Redis 2.6.12版本后,增强了set()命令
/** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; }
加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time), 这个set()方法一共有五个入参:
- 第一个为key,我们使用key来当锁,因为key是唯一的
- 第二个为value,我们传的是requestId,通过给value赋值为requestId,就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成
- 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
- 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定
- 第五个为time,与第四个参数相呼应,代表key的过期时间
高可用:
- set()加入了NX参数,可以保证如果已有key存在,则不会调用成功,也就是只有一个客户端能持有锁,满足互斥性
- 由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁
- 将value赋值为requestId,代表加锁的客户端请求标识,那么在解锁的时候就可以进行校验是否是同一个客户端,防止锁交叉
解锁
/** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; }
首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)
使用eval()配置lua保证原子性
在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令
有效时间
为什么需要一个有效时间呢?主要就是防止死锁
疑难
- 执行业务代码操作共享资源的时间大于设置锁的过期时间?
客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是10s,那么接口超时大概设置5-50ms
【虽然能解决问题,但时间设置成了难点,微服务中多少接口,而且接口的timeout都是可配置的,不能每次调整接口timeout时,还是考虑一下锁的timeout】
- GC的STW
客户端1获得了锁,正准备处理共享资源的时候,发生了Full GC直到锁过期。这样,客户端2又获得了锁,开始处理共享资源。在客户端2处理的时候,客户端1 Full GC完成,也开始处理共享资源,这样就出现了2个客户端都在处理共享资源的情况
续命丸
引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以3min间隔周期进行锁续约
这样如果应用重启了,最多3min等待时间,不会因为时间太长导致的死锁问题,也不会因为时间太短导致被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的
Redission
虾总给了总结性阐述:
首先启动Daemon线程,一直循环检测所有的分布式key,异步递延分布锁的过期时间,只要在处理业务逻辑,就递延分布锁过期时间3min。 每次添加分布式锁key,同时会生成一个uuid token,定义一个ConcurrentHashMap构造一个全局map维护所有的分布式key,上面Daemon线程会遍历这个map,每次解锁需要比对这个token,token一致才能解锁。 这样以来如果应用重启了,最多会有3min等待时间,不会导致时间太长导致的死锁问题,也不会因为时间太短导致的被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的
跟随虾总思路,找到了一个开源组件:Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
相对于平时使用的jedis,redission进行比较高的抽象
redission中的lock主要是RLock接口,继承的juc的Lock接口
public interface RLock extends Lock, RExpirable, RLockAsync
Lock
先看lock(),有两种形式,一个不带leaseTime,一个带leaseTime
public void lock() ; public void lock(long leaseTime, TimeUnit unit) ;
边看源码,边解释
两个方法共用了lockInterruptibly()
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
- 尝试获取锁tryAcquire
- 获取失败,订阅此channel的消息(订阅的意义,在解锁时就会发现)
- 进入循环,不停的尝试获取锁,其中使用了JUC的Semaphore
- 一旦获取成功,则跳出循环
- 取消订阅
尝试获取锁tryAcquire里面会用到两个核心方法tryAcquireAsync(),tryLockInnerAsync()
@Override
1. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { 2. if (leaseTime != -1) { 3. return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); 4. } 5. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 6. ttlRemainingFuture.addListener(new FutureListener<Long>() { 7. @Override 8. public void operationComplete(Future<Long> future) throws Exception { 9. if (!future.isSuccess()) { 10. return; 11. } 12. 13. Long ttlRemaining = future.getNow(); 14. // lock acquired 15. if (ttlRemaining == null) { 16. scheduleExpirationRenewal(threadId); 17. } 18. } 19. }); 20. return ttlRemainingFuture; 21. } • 1.根据锁的持续时间不同,处理也不同 • 2.没有设置持续时间,那就是阻塞型,一直等待 • 2.1.为了防止业务方法执行时间超过锁timeout,则定时续约scheduleExpirationRenewal() • 3.设置了持续时间,则不需要进行续约 23. private void scheduleExpirationRenewal(final long threadId) { 24. if (expirationRenewalMap.containsKey(getEntryName())) { 25. return; 26. } 27. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { 28. @Override 29. public void run(Timeout timeout) throws Exception { 30. 31. RFuture<Boolean> future = renewExpirationAsync(threadId); 32. 33. future.addListener(new FutureListener<Boolean>() { 34. @Override 35. public void operationComplete(Future<Boolean> future) throws Exception { 36. expirationRenewalMap.remove(getEntryName()); 37. if (!future.isSuccess()) { 38. log.error("Can't update lock " + getName() + " expiration", future.cause()); 39. return; 40. } 41. 42. if (future.getNow()) { 43. // reschedule itself 44. scheduleExpirationRenewal(threadId); 45. } 46. } 47. }); 48. } 49. 50. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 51. 52. if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) { 53. task.cancel(); 54. } 55. } 56. 57. protected RFuture<Boolean> renewExpirationAsync(long threadId) { 58. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 59. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 60. "redis.call('pexpire', KEYS[1], ARGV[1]); " + 61. "return 1; " + 62. "end; " + 63. "return 0;", 64. Collections.<Object>singletonList(getName()), 65. internalLockLeaseTime, getLockName(threadId)); 66. } 67. 以internalLockLeaseTime/3间隔时间,定时续约 68. 如果当前client自身有并发时,通过putIfAbsent保证只有一个task 69. 续约:当lock存在时,使用pexpire设置过期时间 70. RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { 71. internalLockLeaseTime = unit.toMillis(leaseTime); 72. 73. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, 74. "if (redis.call('exists', KEYS[1]) == 0) then " + 75. "redis.call('hset', KEYS[1], ARGV[2], 1); " + 76. "redis.call('pexpire', KEYS[1], ARGV[1]); " + 77. "return nil; " + 78. "end; " + 79. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 80. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 81. "redis.call('pexpire', KEYS[1], ARGV[1]); " + 82. "return nil; " + 83. "end; " + 84. "return redis.call('pttl', KEYS[1]);", 85. Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); 86. } 87. 88. protected String getLockName(long threadId) { 89. return id + ":" + threadId; 90. }
• 1.lockname不存在 • 1.1.hset(lockname,uuid+threadid,1),value=uuid+threadid,有uuid可以区分各个client,有threadid区分各个线程,这样锁就具备了可重入性 • 1.2.pexpire设置过期时间,防止client挂掉,造成死锁 • 2.lockname存在 • 2.1.hexists(lockname,uuid+threadid),这样保证了是同一个锁在同一个client • 2.2.hincrby 再次进锁,计数器+1 • 2.3.pexpire 再次设置超时 • 3.lockname存在,并且不在同一client • 3.1.pttl 返回剩余有效时长
unLock 1. @Override 2. public RFuture<Void> unlockAsync(final long threadId) { 3. final RPromise<Void> result = new RedissonPromise<Void>(); 4. RFuture<Boolean> future = unlockInnerAsync(threadId); 5. 6. future.addListener(new FutureListener<Boolean>() { 7. @Override 8. public void operationComplete(Future<Boolean> future) throws Exception { 9. if (!future.isSuccess()) { 10. cancelExpirationRenewal(threadId); 11. result.tryFailure(future.cause()); 12. return; 13. } 14. 15. Boolean opStatus = future.getNow(); 16. if (opStatus == null) { 17. IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " 18. + id + " thread-id: " + threadId); 19. result.tryFailure(cause); 20. return; 21. } 22. if (opStatus) { 23. cancelExpirationRenewal(null); 24. } 25. result.trySuccess(null); 26. } 27. }); 28. 29. return result; 30. } 31. 32. void cancelExpirationRenewal(Long threadId) { 33. ExpirationEntry task = expirationRenewalMap.get(getEntryName()); 34. if (task != null && (threadId == null || task.getThreadId() == threadId)) { 35. expirationRenewalMap.remove(getEntryName()); 36. task.getTimeout().cancel(); 37. } 38. } 39. 从方法名看,虽然对外好像是直接解锁,但内部是异步执行的 40. unlockInnerAsync()进行解锁 41. 从expirationRenewalMap移除,并把task.cancel() 42. protected RFuture<Boolean> unlockInnerAsync(long threadId) { 43. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 44. "if (redis.call('exists', KEYS[1]) == 0) then " + 45. "redis.call('publish', KEYS[2], ARGV[1]); " + 46. "return 1; " + 47. "end;" + 48. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + 49. "return nil;" + 50. "end; " + 51. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + 52. "if (counter > 0) then " + 53. "redis.call('pexpire', KEYS[1], ARGV[2]); " + 54. "return 0; " + 55. "else " + 56. "redis.call('del', KEYS[1]); " + 57. "redis.call('publish', KEYS[2], ARGV[1]); " + 58. "return 1; "+ 59. "end; " + 60. "return nil;", 61. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); 62. 63. } 64. lockname不存在,说明已经解锁,publish channelname unlockmessage;return 1 65. lockname存在,但对于uuid+id不存在,说明不是加锁的client,return nil 66. lockname存在,并且是当前加锁client 67. 对lockname uuid+id进行-1,如果counter>0则走5,如果=0 则走6 68. counter>0 说明锁重入了,计数器-1,并expire 69. counter=0 说明最终解锁,直接del key,并publish channelname unlockmessage;return 1
redission缺陷 使用cluster时 一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。 redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错 redlock实现会更复杂,但从他的算法上看,有zk选举的味道。对于更高可用分布锁,可以借助zk本身特性去实现
总结
对于锁,主要考虑性能与安全,即要保持锁的活跃性,又得保证锁的安全性
分布式锁,除了以上两点,还要考虑实现时的三要素
对于redission,对于锁部分的源码,还有很多的内容,很多的细节需要挖掘,此篇就不写了,太长。
后面再结合JUC,写篇更详细的源码分析
参考资料
Redis分布式锁的正确实现方式
redission