Redis系列-10.Redis分布式锁(上):https://developer.aliyun.com/article/1414722
Lua保证原子性
前面有一个问题就是 finally块的判断 + del删除操作不是原子性的
为了保证原子性,需要启用lua脚本编写redis分布式锁判断 + 删除判断代码
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果值
解决
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; String key = "zzyyRedisLock"; String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId(); while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) { //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber+"\t"+uuidValue; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { //V6.0 将判断+删除自己的合并为lua脚本保证原子性 String luaScript = "if (redis.call('get',KEYS[1]) == ARGV[1]) then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end"; stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Arrays.asList(key), uuidValue); } return retMessage+"\t"+"服务端口号:"+port; } }
可重入锁 + 设计模型
如何兼顾锁的可重入性问题呢?
可重入锁
可重入锁又名递归锁
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。
所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
可重入锁的种类
隐式锁(即synchronized关键字使用的锁)默认是可重入锁
指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的
与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。
public class ReEntryLockDemo { public static void main(String[] args) { final Object objectLockA = new Object(); new Thread(() -> { synchronized (objectLockA) { System.out.println("-----外层调用"); synchronized (objectLockA) { System.out.println("-----中层调用"); synchronized (objectLockA) { System.out.println("-----内层调用"); } } } },"a").start(); } }
public class ReEntryLockDemo { public synchronized void m1() { System.out.println("-----m1"); m2(); } public synchronized void m2() { System.out.println("-----m2"); m3(); } public synchronized void m3() { System.out.println("-----m3"); } public static void main(String[] args) { ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo(); reEntryLockDemo.m1(); } }
Synchronized重入的实现机理
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。
在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。
**当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。**计数器为零代表锁已被释放。
lock/unlock配合可重入锁进行AQS源码分析
如果还是当前线程,则nextc = c + acquires
可重入锁的计数问题,redis中那个数据类型可以代替
K,K,V
Map>
命令
hset key field value
hset redis锁名字(zzyyRedisLock) 某个请求线程的UUID+ThreadID 加锁的次数
总结
setnx,只能解决有无的问题,够用但是不完美
hset,不但解决有无,还解决可重入
思考+设计
目前有两条支线,目的是保证同一个时刻只能有一个线程持有锁进去redis做扣减库存动作
- 可重入
- 扣减原子性
Lua脚本
redis命令过程分析
加锁lua脚本lock
先判断redis分布式锁这个key是否存在
EXISTS key
返回零说明不存在,hset新建当前线程属于自己的锁 BY UUID :ThreadID
HSET zzyyRedisLock 0c90d37cb6ec42268861b3d739f8b3a8:1 1
命令 key value = UUID:ThreadID 次数
返回一说明已经有锁,需进一步判断是不是当前线程自己的
返回零说明不是自己的
返回一说明是自己的锁,自增一次表示重入
上述设计修改为Lua脚本
if redis.call('exists','key') == 0 then redis.call('hset','key','uuid:threadid',1) redis.call('expire','key',30) return 1 elseif redis.call('hexists','key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end 相同部分是否可以替换处理??? hincrby命令可否替代hset命令
if redis.call('exists','key') == 0 or redis.call('hexists','key','uuid:threadid') == 1 then redis.call('hincrby','key','uuid:threadid',1) redis.call('expire','key',30) return 1 else return 0 end
测试
解锁lua脚本unlock
设计思路:有锁且还是自己的锁
HEXISTS key uuid:ThreadID
返回零,说明根本没有锁,程序块返回nil
不是零,说明有锁且是自己的锁,直接调用HINCRBY -1,表示每次减个一,解锁一次。直到它变为零表示可以删除该锁key。
全套流程
if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end
测试全套流程
将上述Lua脚本整个进入微服务Java程序
通过实现JUC里面的Lock接口,实现Redis分布式锁RedisDistributedLock
RedisDistributedLock
public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName;//KEYS[1] private String uuidValue;//ARGV[1] private long expireTime;//ARGV[2] public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName) { this.stringRedisTemplate = stringRedisTemplate; this.lockName = lockName; this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//UUID:ThreadID this.expireTime = 30L; } @Override public void lock() { tryLock(); } @Override public boolean tryLock() { try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false; } /** * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{ if(time != -1L){ this.expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end"; System.out.println("script: "+script); System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50); } return true; } /** *干活的,实现解锁功能 */ @Override public void unlock() { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; // nil = false 1 = true 0 = false System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if(flag == null) { throw new RuntimeException("This lock doesn't EXIST"); } } //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
这样设计的问题?
考虑扩展,本次是redis实现分布式锁,以后是zookeeper、mysql呢?
引入工厂模式改造
DistributedLockFactory
@Component public class DistributedLockFactory { @Autowired private StringRedisTemplate stringRedisTemplate; private String lockName; public Lock getDistributedLock(String lockType) { if(lockType == null) return null; if(lockType.equalsIgnoreCase("REDIS")){ lockName = "zzyyRedisLock"; return new RedisDistributedLock(stringRedisTemplate,lockName); } else if(lockType.equalsIgnoreCase("ZOOKEEPER")){ //TODO zookeeper版本的分布式锁实现 return new ZookeeperDistributedLock(); } else if(lockType.equalsIgnoreCase("MYSQL")){ //TODO mysql版本的分布式锁实现 return null; } return null; } }
RedisDistributedLock
public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName;//KEYS[1] private String uuidValue;//ARGV[1] private long expireTime;//ARGV[2] public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName){ this.stringRedisTemplate = stringRedisTemplate; this.lockName = lockName; this.uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId();//UUID:ThreadID this.expireTime = 30L; } @Override public void lock(){ tryLock(); } @Override public boolean tryLock(){ try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false; } /** * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{ if(time != -1L){ this.expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end"; System.out.println("script: "+script); System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50); } return true; } /** *干活的,实现解锁功能 */ @Override public void unlock() { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; // nil = false 1 = true 0 = false System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if(flag == null) { throw new RuntimeException("This lock doesn't EXIST"); } } //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
InventoryService
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; public String sale() { String retMessage = ""; Lock redisLock = distributedLockFactory.getDistributedLock("redis"); redisLock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { inventoryNumber = inventoryNumber - 1; stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber+"\t服务端口:" +port; System.out.println(retMessage); return retMessage; } retMessage = "商品卖完了,o(╥﹏╥)o"+"\t服务端口:" +port; }catch (Exception e){ e.printStackTrace(); }finally { redisLock.unlock(); } return retMessage; } }
进行可重入测试发现出现问题
DistributedLockFactory
@Component public class DistributedLockFactory { @Autowired private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; public DistributedLockFactory() { this.uuidValue = IdUtil.simpleUUID();//UUID } public Lock getDistributedLock(String lockType) { if(lockType == null) return null; if(lockType.equalsIgnoreCase("REDIS")){ lockName = "zzyyRedisLock"; return new RedisDistributedLock(stringRedisTemplate,lockName,uuidValue); } else if(lockType.equalsIgnoreCase("ZOOKEEPER")){ //TODO zookeeper版本的分布式锁实现 return new ZookeeperDistributedLock(); } else if(lockType.equalsIgnoreCase("MYSQL")){ //TODO mysql版本的分布式锁实现 return null; } return null; } }
RedisDistributedLock
public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName,String uuidValue) { this.stringRedisTemplate = stringRedisTemplate; this.lockName = lockName; this.uuidValue = uuidValue+":"+Thread.currentThread().getId(); this.expireTime = 30L; } @Override public void lock() { this.tryLock(); } @Override public boolean tryLock() { try { return this.tryLock(-1L,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if(time != -1L) { expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end"; System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue); while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { try { TimeUnit.MILLISECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } } return true; } @Override public void unlock() { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + "return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end"; System.out.println("lockName: "+lockName+"\t"+"uuidValue: "+uuidValue); Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime)); if(flag == null) { throw new RuntimeException("没有这个锁,HEXISTS查询无"); } } //========================================================= @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
自动续期
Redis分布式锁还存在一个问题就是续期问题。
//==============自动续期 if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end
RedisDistributedLock
public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName;//KEYS[1] private String uuidValue;//ARGV[1] private long expireTime;//ARGV[2] public RedisDistributedLock(StringRedisTemplate stringRedisTemplate,String lockName,String uuidValue) { this.stringRedisTemplate = stringRedisTemplate; this.lockName = lockName; this.uuidValue = uuidValue+":"+Thread.currentThread().getId(); this.expireTime = 30L; } @Override public void lock() { tryLock(); } @Override public boolean tryLock() { try {tryLock(-1L,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false; } /** * 干活的,实现加锁功能,实现这一个干活的就OK,全盘通用 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if(time != -1L) { this.expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end"; System.out.println("script: "+script); System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50); } this.renewExpire(); return true; } /** *干活的,实现解锁功能 */ @Override public void unlock() { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; // nil = false 1 = true 0 = false System.out.println("lockName: "+lockName); System.out.println("uuidValue: "+uuidValue); System.out.println("expireTime: "+expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if(flag == null) { throw new RuntimeException("This lock doesn't EXIST"); } } // 上锁的情况 private void renewExpire() { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " + "return redis.call('expire',KEYS[1],ARGV[2]) " + "else " + "return 0 " + "end"; new Timer().schedule(new TimerTask() { @Override public void run() { if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { renewExpire(); } } },(this.expireTime * 1000)/3); } //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= //===下面的redis分布式锁暂时用不到======================================= @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
总结
synchronized单机版可以,但是上分布式死翘翘
nginx分布式微服务单机锁不行
如果取消宕机所,可以考虑上redis分布式锁setnx
但是只是加了锁,没有释放锁,出现异常的话,可能无法释放锁,必须要在代码层面finally释放锁
宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要有lockkey的过期时间设定
为redis的分布式锁key,增加过期时间,此外,还必须要setnx+过期时间在同一行,保证原子性
且必须规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴
同时unlock变为Lua脚本保证
最后还需要考虑锁重入问题,使用hset替代setnx+lock变为lua脚本办证,以及自动续期问题