翎逸 2018-02-11 2473浏览量
[toc]
今天我们来聊聊分布式锁。
首先,我们看这样一个场景:客户下单的时候,我们调用库存中心进行减库存,那我们一般的操作都是
update store set num = $num where id=$id
这种通过设置库存的修改方式,我们知道在并发量高的时候会存在数据库的丢失更新,比如a,b当前两个事务,查询出来的库存都是5,a买了3个单子要把库存设置为2,而b买了1个单子要把库存设置为4,那这个时候就会出现a会覆盖b的更新,所以我们更多的都是会加个条件
update store set num = $num where id=$id and num=$query_num
即乐观锁的方式来处理,当然也可以通过版本号来处理乐观锁,都是一样的,但是这是更新一个表,如果我们牵扯到多个表呢,我们希望和这个单子关联的所有的表同一时间只能被一个线程来处理更新,多个线程按照不同的顺序去更新同一个单子关联的不同数据,出现死锁的概率比较大。对于非敏感的数据,我们也没有必要去都加乐观锁处理,我们的服务都是多机器部署的,要保证多进程多线程同时只能有一个进程的一个线程去处理,这个时候我们就需要用到分布式锁。
分布式锁的实现方式有很多,我们今天分别通过数据库,zk,redis以及tair的实现逻辑
更新一个单子关联的所有的数据,先查询出这个单子,并加上排他锁,在进行一系列的更新操作
begin transaction;
select ...for update;
doSomething();
commit();
这种处理需要主要依靠排他锁来阻塞其他线程,不过这个需要注意几点:
通过在一张表里创建唯一键来获取锁,比如执行saveStore这个方法
insert table lock_store ('method_name') values($method_name)
其中method_name是个唯一键,通过这种方式也可以做到,解锁的时候直接删除改行记录就行。不过这种方式,锁就不会是阻塞式的,因为插入数据是立马可以得到返回结果的。
那针对以上数据库实现的两种分布式锁,存在什么样的优缺点呢
使用zk来实现,代码网上比较多,我这里大致说下步骤,我们重点看redis的实现。
只需要删除步骤2中创建的节点即可
使用zk的分布式锁存在什么样的优缺点呢?
分布式锁介绍这块,我们重点看下redis的分布式锁的实现。
我们先举个例子,比如现在我要更新产品的信息,产品的唯一键就是productId
public boolean lock(String key, V v, int expireTime){
int retry = 0;
//获取锁失败最多尝试10次
while (retry < failRetryTimes){
//获取锁
Boolean result = redis.setNx(key, v, expireTime);
if (result){
return true;
}
try {
//获取锁失败间隔一段时间重试
TimeUnit.MILLISECONDS.sleep(sleepInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
public boolean unlock(String key){
return redis.delete(key);
}
public static void main(String[] args) {
Integer productId = 324324;
RedisLock<Integer> redisLock = new RedisLock<Integer>();
redisLock.lock(productId+"", productId, 1000);
}
这是一个简单的实现,存在的问题
针对以上问题我们改进下
private static volatile int count = 0;
public boolean lock(String key, V v, int expireTime){
int retry = 0;
//获取锁失败最多尝试10次
while (retry < failRetryTimes){
//1.先获取锁,如果是当前线程已经持有,则直接返回
//2.防止后面设置锁超时,其实是设置成功,而网络超时导致客户端返回失败,所以获取锁之前需要查询一下
V value = redis.get(key);
//如果当前锁存在,并且属于当前线程持有,则锁计数+1,直接返回
if (null != value && value.equals(v)){
count ++;
return true;
}
//如果锁已经被持有了,那需要等待锁的释放
if (value == null || count <= 0){
//获取锁
Boolean result = redis.setNx(key, v, expireTime);
if (result){
count = 1;
return true;
}
}
try {
//获取锁失败间隔一段时间重试
TimeUnit.MILLISECONDS.sleep(sleepInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
public boolean unlock(String key, String requestId){
String value = redis.get(key);
if (Strings.isNullOrEmpty(value)){
count = 0;
return true;
}
//判断当前锁的持有者是否是当前线程,如果是的话释放锁,不是的话返回false
if (value.equals(requestId)){
if (count > 1){
count -- ;
return true;
}
boolean delete = redis.delete(key);
if (delete){
count = 0;
}
return delete;
}
return false;
}
public static void main(String[] args) {
Integer productId = 324324;
RedisLock<String> redisLock = new RedisLock<String>();
String requestId = UUID.randomUUID().toString();
redisLock.lock(productId+"", requestId, 1000);
}
这种实现基本解决了误释放和可重入的问题。
这里说明几点:
之前在网上还看到有这种实现方式,就是获取到锁之后要检查下锁的过期时间,如果锁过期了要重新设置下时间,大致代码如下
public boolean tryLock2(String key, int expireTime){
long expires = System.currentTimeMillis() + expireTime;
//获取锁
Boolean result = redis.setNx(key, expires, expireTime);
if (result){
return true;
}
V value = redis.get(key);
if (value != null && (Long)value < System.currentTimeMillis()){
//锁已经过期
String oldValue = redis.getSet(key, expireTime);
if (oldValue != null && oldValue.equals(value)){
return true;
}
}
return false;
}
这种实现存在的问题,过度依赖当前服务器的时间了,如果在大量的并发请求下,都判断出了锁过期,而这个时候再去设置锁的时候,最终是会只有一个线程,但是可能会导致不同服务器根据自身不同的时间覆盖掉最终获取锁的那个线程设置的时间。
通过tair来实现分布式锁和redis的实现核心差不多,不过tair有个很方便的api,感觉是实现分布式锁的最佳配置,就是Put api调用的时候需要传入一个version,就和数据库的乐观锁一样,修改数据之后,版本会自动累加,如果传入的版本和当前数据版本不一致,就不允许修改,具体可以看下这篇文章的实现:Tair分布式锁这里就不再多说了
参考
http://www.cnblogs.com/luxiaoxun/p/4889764.html
http://blog.csdn.net/abccheng/article/details/72420996
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
分享数据库前沿,解构实战干货,推动数据库技术变革