前言
大家好,我是捡田螺的小男孩。今天跟大家探讨一下分布式锁的设计与实现。希望对大家有帮助,如果有不正确的地方,欢迎指出,一起学习,一起进步哈~
- 分布式锁概述
- 数据库分布式锁
- Redis分布式锁
- Zookeeper分布式锁
- 三种分布式锁对比
1. 分布式锁概述
我们的系统都是分布式部署的,日常开发中,秒杀下单、抢购商品等等业务场景,为了防⽌库存超卖,都需要用到分布式锁。
分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
业界流行的分布式锁实现,一般有这3种方式:
- 基于数据库实现的分布式锁
- 基于Redis实现的分布式锁
- 基于Zookeeper实现的分布式锁
2. 基于数据库的分布式锁
2.1 数据库悲观锁实现的分布式锁
可以使用select ... for update
来实现分布式锁。我们自己的项目,分布式定时任务,就使用类似的实现方案,我给大家来展示个简单版的哈
表结构如下:
CREATE TABLE `t_resource_lock` ( `key_resource` varchar(45) COLLATE utf8_bin NOT NULL DEFAULT '资源主键', `status` char(1) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT 'S,F,P', `lock_flag` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '1是已经锁 0是未锁', `begin_time` datetime DEFAULT NULL COMMENT '开始时间', `end_time` datetime DEFAULT NULL COMMENT '结束时间', `client_ip` varchar(45) COLLATE utf8_bin NOT NULL DEFAULT '抢到锁的IP', `time` int(10) unsigned NOT NULL DEFAULT '60' COMMENT '方法生命周期内只允许一个结点获取一次锁,单位:分钟', PRIMARY KEY (`key_resource`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
加锁lock
方法的伪代码如下:
@Transcational //一定要加事务 public boolean lock(String keyResource,int time){ resourceLock = 'select * from t_resource_lock where key_resource ='#{keySource}' for update'; try{ if(resourceLock==null){ //插入锁的数据 resourceLock = new ResourceLock(); resourceLock.setTime(time); resourceLock.setLockFlag(1); //上锁 resourceLock.setStatus(P); //处理中 resourceLock.setBeginTime(new Date()); int count = "insert into resourceLock"; if(count==1){ //获取锁成功 return true; } return false; } }catch(Exception x){ return false; } //没上锁并且锁已经超时,即可以获取锁成功 if(resourceLock.getLockFlag=='0'&&'S'.equals(resourceLock.getstatus) && new Date()>=resourceLock.addDateTime(resourceLock.getBeginTime(,time)){ resourceLock.setLockFlag(1); //上锁 resourceLock.setStatus(P); //处理中 resourceLock.setBeginTime(new Date()); //update resourceLock; return true; }else if(new Date()>=resourceLock.addDateTime(resourceLock.getBeginTime(,time)){ //超时未正常执行结束,获取锁失败 return false; }else{ return false; } }
解锁unlock
方法的伪代码如下:
public void unlock(String v,status){ resourceLock.setLockFlag(0); //解锁 resourceLock.setStatus(status); S:表示成功,F表示失败 //update resourceLock; return ; }
整体流程:
try{ if(lock(keyResource,time)){ //加锁 status = process();//你的业务逻辑处理。 } } finally{ unlock(keyResource,status); //释放锁 }
其实这个悲观锁实现的分布式锁,整体的流程还是比较清晰的。就是先select ... for update 锁住主键key_resource那个记录,如果为空,则可以插入一条记录,如果已有记录判断下状态和时间,是否已经超时。这里需要注意一下哈,必须要加事务哈。
2.2 数据库乐观锁实现的分布式锁
除了悲观锁,还可以用乐观锁实现分布式锁。乐观锁,顾名思义,就是很乐观,每次更新操作,都觉得不会存在并发冲突,只有更新失败后,才重试。它是基于CAS思想实现的。我以前的公司,扣减余额就是用这种方案。
搞个version字段,每次更新修改,都会自增加一,然后去更新余额时,把查出来的那个版本号,带上条件去更新,如果是上次那个版本号,就更新,如果不是,表示别人并发修改过了,就继续重试。
大概流程如下:
查询版本号和余额
select version,balance from account where user_id ='666';
假设查到版本号是oldVersion=1.
- 逻辑处理,判断余额
if(balance<扣减金额){ return; } left_balance = balance - 扣减金额;
进行扣减余额
update account set balance = #{left_balance} ,version = version+1 where version = #{oldVersion} and balance>= #{left_balance} and user_id ='666';
大家可以看下这个流程图哈:
这种方式适合并发不高的场景,一般需要设置一下重试的次数
3.基于Redis实现的分布式锁
Redis分布式锁一般有以下这几种实现方式:
- setnx + expire
- setnx + value值是过期时间
- set的扩展命令(set ex px nx)
- set ex px nx + 校验唯一随机值,再删除
- Redisson
- Redisson + RedLock
3.1 setnx + expire
聊到Redis分布式锁,很多小伙伴反手就是setnx + expire
,如下:
if(jedis.setnx(key,lock_value) == 1){ //setnx加锁 expire(key,100); //设置过期时间 try { do something //业务处理 }catch(){ } finally { jedis.del(key); //释放锁 } }
这段代码是可以加锁成功,但是你有没有发现问题,加锁操作和设置超时时间是分开的。假设在执行完setnx加锁后,正要执行expire设置过期时间时,进程crash掉或者要重启维护了,那这个锁就长生不老了,别的线程永远获取不到锁啦,所以分布式锁不能这么实现!
3.2 setnx + value值是过期时间
long expires = System.currentTimeMillis() + expireTime; //系统时间+设置的过期时间 String expiresStr = String.valueOf(expires); // 如果当前锁不存在,返回加锁成功 if (jedis.setnx(key, expiresStr) == 1) { return true; } // 如果锁已经存在,获取锁的过期时间 String currentValueStr = jedis.get(key); // 如果获取到的过期时间,小于系统当前时间,表示已经过期 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间(不了解redis的getSet命令的小伙伴,可以去官网看下哈) String oldValueStr = jedis.getSet(key, expiresStr); if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁 return true; } } //其他情况,均返回加锁失败 return false; }
日常开发中,有些小伙伴就是这么实现分布式锁的,但是会有这些缺点:
过期时间是客户端自己生成的,分布式环境下,每个客户端的时间必须同步。
没有保存持有者的唯一标识,可能被别的客户端释放/解锁。
锁过期的时候,并发多个客户端同时请求过来,都执行了jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖。
3.3 set的扩展命令(set ex px nx)
这个命令的几个参数分别表示什么意思呢?跟大家复习一下:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
- EX second :设置键的过期时间为
second
秒。 - PX millisecond :设置键的过期时间为
millisecond
毫秒。 - NX :只在键不存在时,才对键进行设置操作。
- XX :只在键已经存在时,才对键进行设置操作。
if(jedis.set(key, lock_value, "NX", "EX", 100s) == 1){ //加锁 try { do something //业务处理 }catch(){ } finally { jedis.del(key); //释放锁 } }
这个方案可能存在这样的问题:
- 锁过期释放了,业务还没执行完。
- 锁被别的线程误删。
有些伙伴可能会有个疑问,就是锁为什么会被别的线程误删呢?假设并发多线程场景下,线程A获得了锁,但是它没释放锁的话,线程B是获取不到锁的,所以按道理它是执行不到加锁下面的代码滴,怎么会导致锁被别的线程误删呢?
假设线程A和B,都想用key加锁,最后A抢到锁加锁成功,但是由于执行业务逻辑的耗时很长,超过了设置的超时时间100s。这时候,Redis就自动释放了key锁。这时候线程B就可以加锁成功了,接下啦,它也执行业务逻辑处理。假设碰巧这时候,A执行完自己的业务逻辑,它就去释放锁,但是它就把B的锁给释放了。
3.4 set ex px nx + 校验唯一随机值,再删除
为了解决锁被别的线程误删问题。可以在set ex px nx
的基础上,加上个校验的唯一随机值,如下:
if(jedis.set(key, uni_request_id, "NX", "EX", 100s) == 1){ //加锁 try { do something //业务处理 }catch(){ } finally { //判断是不是当前线程加的锁,是才释放 if (uni_request_id.equals(jedis.get(key))) { jedis.del(key); //释放锁 } } }
在这里,判断当前线程加的锁和释放锁不是一个原子操作。如果调用jedis.del()
释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。
一般可以用lua脚本来包一下。lua脚本如下:
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end;
这种方式比较不错了,一般情况下,已经可以使用这种实现方式。但是还是存在:锁过期释放了,业务还没执行完的问题。