文章目录
一、前言
分布式锁在实际中应用非常之广泛,对于互联网项目电商项目,秒杀活动中不能出现超买超卖的现象,分布式锁在其中具有重要的意义。
二、分布式锁
jvm提供了锁,如synchronized
,ReentrantLock
,也可以使用mysql数据库
实现悲观锁和乐观锁,这些锁只能解决单体应用。
面对多个服务,多个数据库或者负载均衡服务部署的情况下,以上的方式都无法满足,只能使用分布式锁来解决。
三、分布式锁实现
1、使用数据库表实现锁
db_lock
使用db_lock
表来记录锁,创建一个表来记录锁。
CREATE TABLE `db_lock` (
`lock_id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`lock_name` varchar(50) NOT NULL COMMENT '锁名',
`class_name` varchar(100) DEFAULT NULL COMMENT '类名',
`method_name` varchar(50) DEFAULT NULL COMMENT '方法名',
`server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',
`thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',
`create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '获取锁时间',
`lock_desc` varchar(100) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '描述',
PRIMARY KEY (`lock_id`),
UNIQUE KEY `idx_unique` (`lock_name`)
) ENGINE=InnoDB AUTO_INCREMENT=4654 DEFAULT CHARSET=utf8mb3;
代码
public String deductStockByDbLock(Integer productId) {
//1.获取锁
DbLock lock = new DbLock(null,"stock_"+productId,this.getClass().getName(),Thread.currentThread() .getStackTrace()[1].getMethodName(),"",Thread.currentThread().getName(),new Date(),"库存表stock获取锁:"+productId);
try {
//插入数据库锁
dbLockMapper.insert(lock);
}catch (Exception e){
e.printStackTrace();
//插入异常重试机制
try{
//线程睡眠
Thread.sleep(5);
//重试调用
deductStockByDbLock(productId);
}catch (Exception ex){
e.printStackTrace();
}
}
//2.锁成功执行业务逻辑
//查询当前的库存
Stock stock = stockMapper.getStockByProductId(productId);
if(stock !=null && stock.getStockNum() >0){
stock.setStockNum(stock.getStockNum()-1);
//更新库存
stockMapper.updateById(stock);
}
//3.释放锁
dbLockMapper.deleteById(lock.getLock_id());
return "扣减库存成功!";
}
缺陷
- 使用数据库表作为锁,需要一个数据库,必须要保证数据库可用才行,数据库出现问题,会导致业务系统不可用。
- 锁释放是在业务执行逻辑之后,一旦出现异常,可能释放锁操作失败,其他线程就无法获取到锁。
- 非重入锁,一旦锁没有释放,再次加锁就会造成死锁。
- 并发能力效率不是很高。
2、redis实现锁
实现原理
格式:setnx key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』
模式:将 SETNX 用于加锁(locking)
警告:已经证实这个加锁算法带有竞争条件,在特定情况下会造成错误,请不要使用这个加锁算法。
SETNX 可以用作加锁原语(locking primitive)。比如说,要对关键字(key) foo 加锁,
客户端可以尝试以下方式:
SETNX lock.foo <current Unix time + lock timeout + 1>
如果 SETNX 返回 1 ,说明客户端已经获得了锁, key 设置的unix 时间则指定了锁失效的时间。之后客户端可以通过 DEL lock.foo 来释放锁。
如果 SETNX 返回 0 ,说明 key 已经被其他客户端上锁了。如果锁是非阻塞(nonblocking lock)的,我们可以选择返回调用,或者进入一个重试循环,直到成功获得锁或重试超时(timeout)。
实现步骤
- 多个线程同时执行:setnx lock xxx 只有一个可以执行成功,获取锁失败重试。
- 获取锁执行业务逻辑。
- 释放锁,执行:del lock。
实现代码
redis配置
#配置redis
redis:
host: 192.168.5.130
database: 0 #配置数据库0
port: 6379 #端口
#password:
#配置连接池
jedis:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1 #ms
Java代码
/**
* 通过redis锁来实现扣减库存
* @param productId
* @return
*/
@Override
public String deductStockByRedis(Integer productId) {
//1.加锁以及重试
while(!redisTemplate.opsForValue().setIfAbsent("lockStock",geneUUID())){
try{
Thread.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
}
//2.查询库存是否充足
Stock stock = stockMapper.getStockByProductId(productId);
//3.业务逻辑
if(stock!= null && stock.getStockNum()>0){
stock.setStockNum(stock.getStockNum()-1);
stockMapper.updateById(stock);
}
//4.释放锁
redisTemplate.delete("lockStock");
return "扣减库存成功";
}
缺点
如果业务逻辑出现问题,无法释放锁会出现死锁。
解决
需要加锁的过期时间。
public String deductStockByRedis(Integer productId) {
//1.加锁以及重试 加上时间
while(!redisTemplate.opsForValue().setIfAbsent("lockStock",geneUUID(),5, TimeUnit.SECONDS)){
try{
Thread.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
}
//2.查询库存是否充足
Stock stock = stockMapper.getStockByProductId(productId);
//3.业务逻辑
if(stock!= null && stock.getStockNum()>0){
stock.setStockNum(stock.getStockNum()-1);
stockMapper.updateById(stock);
}
//4.释放锁
redisTemplate.delete("lockStock");
return "扣减库存成功";
}
加上过期时间有一个问题?过期时间应该加多长的时间呢
?
加上误删除
判断是否是同一个锁。
//防止误删除锁
if (StringUtils.equals(uuid, String.valueOf(redisTemplate.opsForValue().get("lockStock")))) {
redisTemplate.delete("lockStock");
}
同样还是出现问题
。
必须保证一组命令操作的原子性
。redis采用单线程架构
,可以保证单个命令的原子性
,但是无法保证一组命令在高并发场景下的原子性。
Java中redisson中的分布式锁
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
配置redission客户端
/**
* 获取redisson客户端
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
// 可以用"rediss://"来启用SSL连接
config.useSingleServer().setAddress("redis://192.168.5.130:6379");
return Redisson.create(config);
}
}
代码
@Autowired
private RedissonClient redissonClient;
/**
* redission实现扣减库存
* @param productId
* @return
*/
@Override
public String deductStockByRedission(Integer productId) {
// 加锁,获取锁失败重试
RLock lock = redissonClient.getLock("lockStock_"+productId);
lock.lock();
// 先查询库存是否充足
Stock stock = stockMapper.getStockByProductId(productId);
// 再减库存
if (stock != null && stock.getStockNum() > 0){
stock.setStockNum(stock.getStockNum() - 1);
stockMapper.updateById(stock);
}
// 释放锁
lock.unlock();
return "扣减库存成功";
}
3、zookeeper实现锁
zookeeper主要是对Node的操作来实现锁,对节点的创建,删除,修改以及子节点的变更操作。
引入zookeeper依赖客户端
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
锁实现的思路
1.创建一个节点==》 获取锁
2.删除一个节点==》 删除锁
3. 重试:没有获取到锁的请求重试
业务逻辑
多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁。执行业务逻辑,完成业务流程后,删除节点释放锁。
zk客户端
package com.elite.currencylock.config;
import org.apache.zookeeper.*;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Component
public class ZkClient {
private static final String connectString = "192.168.5.130:2181";
private static final String ROOT_PATH = "/distributedLock";
private ZooKeeper zooKeeper;
@PostConstruct
public void init(){
try {
// 连接zookeeper服务器
this.zooKeeper = new ZooKeeper(connectString, 30000, new
Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("获取链接成功!!");
}
});
// 创建分布式锁根节点
if (this.zooKeeper.exists(ROOT_PATH, false) == null){
this.zooKeeper.create(ROOT_PATH, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
System.out.println("获取链接失败!");
e.printStackTrace();
}
}
@PreDestroy
public void destroy(){
try {
if (zooKeeper != null){
zooKeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 初始化zk分布式锁对象方法
* @param lockName
* @return
*/
public ZkDistributedLock getZkDistributedLock(String lockName){
return new ZkDistributedLock(zooKeeper, lockName);
}
}
加锁释放锁
/**
* zookeeper实现分布式锁
*/
public class ZkDistributedLock {
private static final String ROOT_PATH = "/distributedLock";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){
this.zooKeeper = zooKeeper;
this.path = ROOT_PATH + "/" + lockName;
}
public void lock(){
try {
zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
} catch (Exception e) {
// 重试
try {
Thread.sleep(200);
lock();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
/**
*释放锁
*/
public void unlock(){
try {
this.zooKeeper.delete(path, 0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
扣减库存
/**
* zookeeper实现分布式
* @param productId
* @return
*/
@Override
public String deductStockByZookeeper(Integer productId) {
// 加锁,获取锁失败重试
ZkDistributedLock lock = zkClient.getZkDistributedLock("lockStock_"+productId);
lock.lock();
// 先查询库存是否充足
Stock stock = this.stockMapper.getStockByProductId(productId);
// 再减库存
if (stock != null && stock.getStockNum() > 0){
stock.setStockNum(stock.getStockNum() - 1);
this.stockMapper.updateById(stock);
}
//释放锁
lock.unlock();
return "扣减库存成功";
}
四、总结
本篇讲解了数据库锁的方式,缓存以及zk实现分布式锁,深入的还得进行每一种锁更深的原理进行挖掘。