Redis分布式锁
经典面试题
Redis除了拿来做缓存,还可以用来做什么?:
- 分布式session
- 分布式锁
- 全局ID
- 点赞
- 位统计
- 差集交集并集,用户关注,可能认识的人,推荐模型
- 热点新闻,热搜排行榜
Redis做分布式锁的时候有需要注意的问题吗?
你们公司自己实现的分布式锁是否用setnx实现?这个是最合适的吗?你如何考虑分布式锁的可重入性?
Redis分布式锁如何续期?看门狗知道吗?
锁的种类
单机版同一个JVM虚拟机内,synchronized或者Lock接口
分布式多个不同JVM虚拟机,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了
靠谱的分布式锁需要具备的条件和刚需
独占性
OnlyOne,任何时刻只能有且仅有一个线程持有
高可用
若Redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况,高并发请求下,依旧性能OK好使
防死锁
杜绝死锁,必须有超时控制或者撤销操作,有个兜底终止跳出方案
不乱抢
防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放,自己约的锁自己含泪也要自己解
重入性
同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁
分布式锁
setnx key value
差评,setnx+expire不安全,两条命令非原子性的
set key value [EX seconds] [PX millseconds] [NX|XX]
重点
JUC中的AQS锁的规范落地参考 + 可重入锁考虑 + Lua脚本 + Redis命令一步步实现分布式锁
Base案例(boot + redis)
使用场景:多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止业务出现并发攻击)
InventoryService
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; lock.lock(); try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { lock.unlock(); } return retMessage+"\t"+"服务端口号:"+port; } }
InventoryController
@RestController @Api(tags = "redis分布式锁测试") public class InventoryController { @Autowired private InventoryService inventoryService; @ApiOperation("扣减库存,一次卖一个") @GetMapping(value = "/inventory/sale") public String sale() { return inventoryService.sale(); } }
这段代码算是初始版本,加了synchronized或者lock
nginx分布式微服务架构
v2.0版本分布式部署后,单机锁害死出现超卖现象,需要分布式锁
修改nginx上的配置文件 /usr/local/nginx/conf 目录下修改配置文件nginx.conf新增反向代理和负载均衡
启动配置两个InventoryService 分别在7777 和 8888端口
通过Nginx访问,你的linux服务器地址ip,反向代理 + 负载均衡
采用jmeter来模拟高并发
共有100个商品
发现76号商品被卖出两次,出现超卖故障现象
但是为什么加了synchronized或者lock还是没有控制住呢?
在单机环境下,可以使用synchronized或Lock来实现。
但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),
所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)
不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
分布式锁的出现,能够跨进程+跨服务、解决超卖、防止缓存击穿
解决
redis分布式锁
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; String key = "zzyyRedisLock"; String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId(); Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue); if(!flag){ //暂停20毫秒后递归调用 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } sale(); }else{ try{ //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { stringRedisTemplate.delete(key); } } return retMessage+"\t"+"服务端口号:"+port; } }
通过递归重试的方式,但是会有问题就是,测试手工OK,测试Jmeter压测5000OK
递归是一种思想没错,但是容易StackOverflowError,不太推荐,需要进一步完善
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; String key = "zzyyRedisLock"; String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId(); while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){ //暂停20毫秒,类似CAS自旋 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { stringRedisTemplate.delete(key); } return retMessage+"\t"+"服务端口号:"+port; } }
可以使用自旋来替代递归重试
宕机与过期+防止死锁
部署了微服务的java程序机器挂了,代码层面根本没有走到finally这块,没办法保证解锁(无过期时间key一直存在)这个key没有被删除,需要加入一个过期时间限定key
初步这样设计
while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) { //暂停20毫秒,进行递归重试..... try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } stringRedisTemplate.expire(key,30L,TimeUnit.SECONDS); // 请大家思考可以这么操作吗?
设置key + 过期时间分开了,必须要合并成一行具备原子性
解决
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; String key = "zzyyRedisLock"; String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId(); while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) { //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { stringRedisTemplate.delete(key); } return retMessage+"\t"+"服务端口号:"+port; } }
所以最终 加锁和过期时间设置必须同一行,保证原子性
防止误删key的问题
实际业务处理时间如果超过了默认设置key的过期时间?
那么就会出现张冠李戴,删除了别人的锁
那么就需要做成,自己删除自己的,不许动别人的
@Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock(); public String sale() { String retMessage = ""; String key = "zzyyRedisLock"; String uuidValue = IdUtil.simpleUUID()+":"+Thread.currentThread().getId(); while(!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) { //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } try { //1 查询库存信息 String result = stringRedisTemplate.opsForValue().get("inventory001"); //2 判断库存是否足够 Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); //3 扣减库存 if(inventoryNumber > 0) { stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: "+inventoryNumber+"\t"+uuidValue; System.out.println(retMessage); }else{ retMessage = "商品卖完了,o(╥﹏╥)o"; } }finally { // v5.0判断加锁与解锁是不是同一个客户端,同一个才行,自己只能删除自己的锁,不误删他人的 if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)){ stringRedisTemplate.delete(key); } } return retMessage+"\t"+"服务端口号:"+port; } }
Redis系列-10.Redis分布式锁(下):https://developer.aliyun.com/article/1414725