Java中几种分布式锁的实现

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Java中几种分布式锁的实现

文章目录

一、前言

分布式锁在实际中应用非常之广泛,对于互联网项目电商项目,秒杀活动中不能出现超买超卖的现象,分布式锁在其中具有重要的意义。

二、分布式锁

jvm提供了锁,如synchronized,ReentrantLock,也可以使用mysql数据库实现悲观锁和乐观锁,这些锁只能解决单体应用。

面对多个服务,多个数据库或者负载均衡服务部署的情况下,以上的方式都无法满足,只能使用分布式锁来解决。

三、分布式锁实现

1、使用数据库表实现锁

db_lock

使用db_lock表来记录锁,创建一个表来记录锁。

CREATE TABLE `db_lock` (

 `lock_id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',

 `lock_name` varchar(50) NOT NULL COMMENT '锁名',

 `class_name` varchar(100) DEFAULT NULL COMMENT '类名',

 `method_name` varchar(50) DEFAULT NULL COMMENT '方法名',

 `server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',

 `thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',

 `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '获取锁时间',

 `lock_desc` varchar(100) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL COMMENT '描述',

 PRIMARY KEY (`lock_id`),

 UNIQUE KEY `idx_unique` (`lock_name`)

) ENGINE=InnoDB AUTO_INCREMENT=4654 DEFAULT CHARSET=utf8mb3;

代码

public String deductStockByDbLock(Integer productId) {

       //1.获取锁

       DbLock lock = new DbLock(null,"stock_"+productId,this.getClass().getName(),Thread.currentThread() .getStackTrace()[1].getMethodName(),"",Thread.currentThread().getName(),new Date(),"库存表stock获取锁:"+productId);

       try {

           //插入数据库锁

           dbLockMapper.insert(lock);

       }catch (Exception e){

           e.printStackTrace();

           //插入异常重试机制

           try{

               //线程睡眠

               Thread.sleep(5);

               //重试调用

               deductStockByDbLock(productId);

           }catch (Exception ex){

              e.printStackTrace();

           }

       }

       //2.锁成功执行业务逻辑

       //查询当前的库存

       Stock stock = stockMapper.getStockByProductId(productId);

       if(stock !=null && stock.getStockNum() >0){

           stock.setStockNum(stock.getStockNum()-1);

           //更新库存

           stockMapper.updateById(stock);

       }

       //3.释放锁

       dbLockMapper.deleteById(lock.getLock_id());

       return "扣减库存成功!";

   }

缺陷

  1. 使用数据库表作为锁,需要一个数据库,必须要保证数据库可用才行,数据库出现问题,会导致业务系统不可用。
  2. 锁释放是在业务执行逻辑之后,一旦出现异常,可能释放锁操作失败,其他线程就无法获取到锁。
  3. 非重入锁,一旦锁没有释放,再次加锁就会造成死锁。
  4. 并发能力效率不是很高。

2、redis实现锁

实现原理

格式:setnx key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』

模式:将 SETNX 用于加锁(locking)

警告:已经证实这个加锁算法带有竞争条件,在特定情况下会造成错误,请不要使用这个加锁算法。

SETNX 可以用作加锁原语(locking primitive)。比如说,要对关键字(key) foo 加锁,

客户端可以尝试以下方式:

SETNX lock.foo <current Unix time + lock timeout + 1>

如果 SETNX 返回 1 ,说明客户端已经获得了锁, key 设置的unix 时间则指定了锁失效的时间。之后客户端可以通过 DEL lock.foo 来释放锁。

如果 SETNX 返回 0 ,说明 key 已经被其他客户端上锁了。如果锁是非阻塞(nonblocking lock)的,我们可以选择返回调用,或者进入一个重试循环,直到成功获得锁或重试超时(timeout)。

实现步骤

  1. 多个线程同时执行:setnx lock xxx 只有一个可以执行成功,获取锁失败重试。
  2. 获取锁执行业务逻辑。
  3. 释放锁,执行:del lock。

实现代码

redis配置

#配置redis

 redis:

   host: 192.168.5.130

   database: 0  #配置数据库0

   port: 6379 #端口

   #password:

   #配置连接池

   jedis:

     pool:

       max-active: 8

       max-idle: 8

       min-idle: 0

       max-wait: -1 #ms

Java代码


/**

    * 通过redis锁来实现扣减库存

    * @param productId

    * @return

    */

   @Override

   public String deductStockByRedis(Integer productId) {

       //1.加锁以及重试

       while(!redisTemplate.opsForValue().setIfAbsent("lockStock",geneUUID())){

          try{

              Thread.sleep(5);

          }catch (InterruptedException e){

              e.printStackTrace();

          }

       }

       //2.查询库存是否充足

       Stock stock = stockMapper.getStockByProductId(productId);

       //3.业务逻辑

       if(stock!= null && stock.getStockNum()>0){

           stock.setStockNum(stock.getStockNum()-1);

           stockMapper.updateById(stock);

       }

       //4.释放锁

       redisTemplate.delete("lockStock");

       return "扣减库存成功";

   }

缺点

如果业务逻辑出现问题,无法释放锁会出现死锁。

解决

需要加锁的过期时间。

public String deductStockByRedis(Integer productId) {

       //1.加锁以及重试 加上时间

       while(!redisTemplate.opsForValue().setIfAbsent("lockStock",geneUUID(),5, TimeUnit.SECONDS)){

          try{

              Thread.sleep(5);

          }catch (InterruptedException e){

              e.printStackTrace();

          }

       }

       //2.查询库存是否充足

       Stock stock = stockMapper.getStockByProductId(productId);

       //3.业务逻辑

       if(stock!= null && stock.getStockNum()>0){

           stock.setStockNum(stock.getStockNum()-1);

           stockMapper.updateById(stock);

       }

       //4.释放锁

       redisTemplate.delete("lockStock");

       return "扣减库存成功";

   }

加上过期时间有一个问题?过期时间应该加多长的时间呢

加上误删除

判断是否是同一个锁。

//防止误删除锁

       if (StringUtils.equals(uuid, String.valueOf(redisTemplate.opsForValue().get("lockStock")))) {

           redisTemplate.delete("lockStock");

       }

同样还是出现问题

必须保证一组命令操作的原子性redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。

Java中redisson中的分布式锁

引入依赖

<dependency>

<groupId>org.redisson</groupId>

<artifactId>redisson</artifactId>

<version>3.11.2</version>

</dependency>

配置redission客户端

/**

* 获取redisson客户端

*/

@Configuration

public class RedissonConfig {

   @Bean

   public RedissonClient redissonClient(){

       Config config = new Config();

       // 可以用"rediss://"来启用SSL连接

       config.useSingleServer().setAddress("redis://192.168.5.130:6379");

       return Redisson.create(config);

   }

}

代码

   @Autowired

   private RedissonClient redissonClient;

 /**

    *  redission实现扣减库存

    * @param productId

    * @return

    */

   @Override

   public String deductStockByRedission(Integer productId) {

       // 加锁,获取锁失败重试

       RLock lock = redissonClient.getLock("lockStock_"+productId);

       lock.lock();

       // 先查询库存是否充足

       Stock stock = stockMapper.getStockByProductId(productId);

       // 再减库存

       if (stock != null && stock.getStockNum() > 0){

           stock.setStockNum(stock.getStockNum() - 1);

           stockMapper.updateById(stock);

       }

       // 释放锁

       lock.unlock();

       return "扣减库存成功";

   }

3、zookeeper实现锁

zookeeper主要是对Node的操作来实现锁,对节点的创建,删除,修改以及子节点的变更操作。

引入zookeeper依赖客户端

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.14</version>

</dependency>

锁实现的思路

1.创建一个节点==》 获取锁

2.删除一个节点==》 删除锁

3. 重试:没有获取到锁的请求重试

业务逻辑

多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁。执行业务逻辑,完成业务流程后,删除节点释放锁。

zk客户端

package com.elite.currencylock.config;

import org.apache.zookeeper.*;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;

@Component

public class ZkClient {

   private static final String connectString = "192.168.5.130:2181";

   private static final String ROOT_PATH = "/distributedLock";

   private ZooKeeper zooKeeper;

   @PostConstruct

   public void init(){

       try {

            // 连接zookeeper服务器

            this.zooKeeper = new ZooKeeper(connectString, 30000, new

                   Watcher() {

                       @Override

                       public void process(WatchedEvent event) {

                           System.out.println("获取链接成功!!");

                       }

                   });

             // 创建分布式锁根节点

           if (this.zooKeeper.exists(ROOT_PATH, false) == null){

               this.zooKeeper.create(ROOT_PATH, null,

                       ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

           }

       } catch (Exception e) {

           System.out.println("获取链接失败!");

           e.printStackTrace();

       }

   }

   @PreDestroy

   public void destroy(){

       try {

           if (zooKeeper != null){

               zooKeeper.close();

           }

       } catch (InterruptedException e) {

           e.printStackTrace();

       }

   }

   /**

    * 初始化zk分布式锁对象方法

    * @param lockName

    * @return

    */

   public ZkDistributedLock getZkDistributedLock(String lockName){

       return new ZkDistributedLock(zooKeeper, lockName);

   }

}

加锁释放锁

/**

* zookeeper实现分布式锁

*/

public class ZkDistributedLock {

   private static final String ROOT_PATH = "/distributedLock";

   private String path;

   private ZooKeeper zooKeeper;

   public ZkDistributedLock(ZooKeeper zooKeeper, String lockName){

       this.zooKeeper = zooKeeper;

       this.path = ROOT_PATH + "/" + lockName;

   }

   public void lock(){

       try {

           zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,

           CreateMode.EPHEMERAL);

       } catch (Exception e) {

           // 重试

           try {

               Thread.sleep(200);

               lock();

           } catch (InterruptedException ex) {

                ex.printStackTrace();

           }

       }

   }

   /**

    *释放锁

    */

   public void unlock(){

       try {

           this.zooKeeper.delete(path, 0);

       } catch (InterruptedException e) {

           e.printStackTrace();

       } catch (KeeperException e) {

           e.printStackTrace();

       }

   }

}

扣减库存

   /**

    * zookeeper实现分布式

    * @param productId

    * @return

    */

   @Override

   public String deductStockByZookeeper(Integer productId) {

       // 加锁,获取锁失败重试

       ZkDistributedLock lock = zkClient.getZkDistributedLock("lockStock_"+productId);

       lock.lock();

       // 先查询库存是否充足

       Stock stock = this.stockMapper.getStockByProductId(productId);

       // 再减库存

       if (stock != null && stock.getStockNum() > 0){

           stock.setStockNum(stock.getStockNum() - 1);

           this.stockMapper.updateById(stock);

       }

       //释放锁

       lock.unlock();

       return "扣减库存成功";

   }

四、总结

本篇讲解了数据库锁的方式,缓存以及zk实现分布式锁,深入的还得进行每一种锁更深的原理进行挖掘。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
19天前
|
存储 NoSQL Java
Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
【10月更文挑战第29天】Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
49 1
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
61 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
1月前
|
NoSQL Java 数据库
Java分布式锁
Java分布式锁
38 0
|
1月前
|
缓存 Java 数据库
JAVA分布式CAP原则
JAVA分布式CAP原则
63 0
|
3月前
|
存储 NoSQL Java
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
73 8
|
4月前
|
负载均衡 NoSQL Java
|
4月前
|
存储 算法 Java
分布式自增ID算法---雪花算法(SnowFlake)Java实现
分布式自增ID算法---雪花算法(SnowFlake)Java实现
287 0