一文讲清楚硬核分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 一文讲清楚硬核分布式锁


一、58集团自研WLock

1、Wlock与其他实现对比:

2、主要特性:

WLock基于WPaxos实现分布式锁服务,引入RocksDB实现锁状态持久化存储,封装TTL和续约机制以及watch异步通知功能,同时提供可视化监管平台,提供了一套完备的分布式锁实现方案;

(1)WPaxos简述:

WPaxos为58集团参照微信团队开源的PhxPaxos(C++)采用Java语言实现的分布式一致性组件,支持并行确定多个值,并将满足某种规则的请求,跳过prepare阶段,直接进入accept阶段,优化提交过程(Basic Paxos需要prepare与accept两阶段提交)。

(2)RocksDB简述:

LevelDB是由Google开源的,基于LSM Tree的单机KV数据库,其特点是高效,代码简洁而优美,Rocksdb则是Facebook基于LevelDB改造的。RocksDB 和LevelDB 是一个库,嵌入在用户的程序中,用户程序直接调用接口读写数据。相对于Redis不需要建立连接才能向他发请求,读写数据。

3、对WLock的封装后的工具类:

/**
 * @author Archi Liu
 * @version 1.0
 * @date 2021/11/10 3:20 下午
 * 分布式锁服务,当配置文件中存在wlock的配置时将创建该类实例Bean
 */
@Slf4j
@Service
@Conditional(WLockCondition.class)
public class LockService {
    /**
     * WLock的秘钥文件名(秘钥文件从WLock管理平台下载)
     */
    @Value("${wlock.key.file}")
    private String keyName;
    /**
     * 加解锁请求重试次数,底层默认重试2次,可修改该值提升性能
     */
    @Value("${wlock.retryNum:-1}")
    private Integer retryNum;
    /**
     * 若未设置过期锁时间,则使用该过期时间(30秒)
     */
    private final int defaultExpireTime = 30 * 1000;
    /**
     * 自动续期时间为过期时间的1/3
     */
    private final int defaultRenewIntervalTime = 10 * 1000;
    /**
     * 配置文件路径,需要兼容WF及SCF项目在容器环境和本地环境上的路径
     */
    private static String configPath;
    /**
     * wlock配置文件名
     */
    public static final String keyFile = "wlock.key.file";
    /**
     * 操作WLock的客户端,使用懒加载单例模式
     */
    private WLockClient wLockClient;
    /**
     * 如果是WF项目,需要调用该方法初始化WLock配置文件目录
     *
     * @param path
     */
    public static void initConfigPath(String path) {
        configPath = path;
    }
    /**
     * 获取WLock配置文件所在路径,如果项目中未配置则先检查是否为scf/scf-springboot项目容器部署环境,如果不是默认读取本地配置
     *
     * @return
     */
    private String getConfigPath() {
        if (configPath != null) {
            return configPath;
        }
        //如果是在容器环境上发布的scf/scf-springboot类型项目将会有该配置值
        configPath = System.getProperty("scf.config.location");
        if (StringUtils.isEmpty(configPath)) {
            configPath = Thread.currentThread().getContextClassLoader().getResource("").getPath();
        }
        log.info("[LockUtil] configPath:{}", configPath);
        return configPath;
    }
    /**
     * 获取单例WLockClient
     *
     * @return
     */
    private WLockClient getWLockClient() {
        if (wLockClient != null) {
            return wLockClient;
        }
        synchronized (WLockClient.class) {
            if (wLockClient != null) {
                return wLockClient;
            }
            try {
                wLockClient = new WLockClient(getConfigPath() + keyName);
                //如果设置了重试次数需要重置默认重试次数,默认重试次数为3次(注意WLock内部将首次发请求也算作一次retry)
                if (retryNum >= 0) {
                    wLockClient.setDefaultRetries(retryNum + 1);
                }
            } catch (Exception e) {
                log.error("[LockUtil] WLockClient init failed!exception:{}", ExceptionUtil.getStackTrace(e));
                throw new DistributedLockException(ResponseCodeEnum.LOCK_CLIENT_INIT_FAIL);
            }
        }
        return wLockClient;
    }
    /**
     * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。
     *
     * @param lockName 锁名称
     * @return
     */
    public boolean tryGetDistributedLock(String lockName) {
        AcquireLockResult lockResult;
        try {
            WDistributedLock wLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wLock.tryAcquireLockUnblocked(defaultExpireTime, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},renewInterval={},exception:{}",
                    lockName, defaultExpireTime, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e));
            return false;
        }
        log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param unit       锁过期时间单位
     * @return
     */
    public boolean tryGetDistributedLock(String lockName, int expireTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLockUnblocked(lockExpireTime, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},exception:{}",
                    lockName, lockExpireTime, ExceptionUtil.getStackTrace(e));
            return false;
        }
        log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。
     *
     * @param lockName
     */
    public void getDistributedLock(String lockName) {
        //锁自动续期间隔(过期时间的三分之一)
        AcquireLockResult lockResult;
        try {
            WDistributedLock wLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wLock.tryAcquireLock(defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},renewInterval={},exception:{}",
                    lockName, defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param unit       锁过期时间单位
     * @return
     */
    public void getDistributedLock(String lockName, int expireTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLock(lockExpireTime, Integer.MAX_VALUE, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}",
                    lockName, lockExpireTime, Integer.MAX_VALUE, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,最多等待maxWaitTime时间,成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param expireTime 最长等待时间
     * @param unit       锁过期时间单位
     * @return
     */
    public void getDistributedLock(String lockName, int expireTime, int maxWaitTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        //获取锁最大等待时间
        int lockMaxWaitTime = (int) unit.toMillis(maxWaitTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLock(lockExpireTime, lockMaxWaitTime, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}",
                    lockName, lockExpireTime, lockMaxWaitTime, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            //修改成获取分布式锁失败的异常
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 释放分布式锁,若释放成功返回true,否则返回false,锁释放失败不会抛出异常
     *
     * @param lockName 锁名称
     * @return
     */
    public boolean releaseDistributedLock(String lockName) {
        LockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.releaseLock();
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] releaseDistributedLock error! parameter illegal,lockName={},exception:{}", lockName, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] releaseDistributedLock, lockName={}, result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 锁续约回调通知
     *
     * @return
     */
    private RenewListener getRenewListener() {
        RenewListener renewListener = new RenewListener() {
            @Override
            public void onRenewSuccess(String s) {
                log.info("[LockUtil] renewSuccess! info={}", s);
            }
            @Override
            public void onRenewFailed(String s) {
                log.info("[LockUtil] renewFailed! info={}", s);
            }
        };
        return renewListener;
    }
    /**
     * 锁过期回调通知
     *
     * @return
     */
    private LockExpireListener getLockExpireListener() {
        LockExpireListener lockExpireListener = new LockExpireListener() {
            @Override
            public void onExpire(String s) {
                log.info("[LockUtil] lock Expired! info={}", s);
            }
        };
        return lockExpireListener;
    }
}



WLock不像Redisson提供了多种类型的锁,其只提供了WDistributedLock,但同样支持互斥锁、可重入锁、公平锁及带权重优先级锁,可通过同步阻塞或者异步非阻塞方式获取到锁。所有对分布式锁的操作都通过该对象进行,在获取锁时可以传递以下参数:

waitAcquire

是否阻塞等待获取到锁,true为阻塞,false为非阻塞

expireTime

锁过期时间,单位毫秒,默认值为5分钟,最大取值5分钟,最小值5秒

maxWaitTime

最长等待获取锁的时间,单位毫秒,最大值Long.MAX_VALUE

weight

锁权重,默认都为1,取值范围[1, 10],权重越高,获取到锁概率越高

renewInterval

自动续约间隔,单位毫秒(默认为Integer.MAX_VALUE,不自动续租,最小自动续租间隔为1000ms,最大自动续租间隔不能超过过期时间,由业务控制)。

renewListener

续约Listener回调

lockExpireListener

锁过期Listener回调

watchListener

异步监听事件回调

死锁问题补充:

不管是Redisson还是WLock都使用了客户端定时续约的方式延长锁过期时间,如果处理不当将造成死锁:由于加锁和锁续约在两个线程中执行,若加锁线程在释放锁之前异常退出将导致续约线程一直执行续约操作,造成死锁,此时只能使用重启进程的方式进行锁释放。所以业务在加锁处理逻辑的上层一定添加try catch 异常获,在finally逻辑中释放锁。加解锁操作参照阿里开发规范:



正例:
  Lock lock = new XxxLock();
  // ...
  lock.lock();
  try {
      doSomething();
      doOthers();
  } finally {
      lock.unlock();
  }
反例:
  Lock lock = new XxxLock();
  // ...
  try {
      // 如果此处抛出异常,则直接执行 finally 代码块
      doSomething();
      // 无论加锁是否成功,finally 代码块都会执行
      lock.lock();
      doOthers();
  } finally {
      lock.unlock();
  }



二、单机锁

1、在Java中每个对象都有一把锁,如普通的Object对象及类的Class对象。线程可以使用synchronized关键字来获取对象上的锁。synchronized关键字可以应用在方法级别(粗粒度)或代码块级别(细粒度),在JDK1.6以前,使用synchronized只有一种方式即重量级锁,而在JDK1.6以后,引入了偏向锁,轻量级锁,重量级锁,来减少竞争带来的上下文切换。

2、JUC包下提供了如下的锁:

ReentrantLock

可重入锁,提供公平与非公平的方式

ReentrantReadWriteLock

可重入读写锁,写锁类似于可重入锁,读写互斥

StampedLock

邮戳锁,优化读写锁会造成写线程饥饿问题,提供了乐观读方式,读写不互斥

三、分布式锁

1、为什么我们需要分布式锁?

单机锁主要是为了同步同一进程中各个线程之间的操作。大多数互联网系统都是分布式部署的,当某个资源在多系统之间具有共享性的时候,为了保证大家访问这个资源数据是一致的,那么就必须要求在同一时刻只能被一个客户端处理,不能并发的执行,否者就会出现同一时刻有人写有人读,大家访问到的数据就不一致了。分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。

2、分布式锁需要具备的条件?

排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取

避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)

高可用:获取或释放锁的机制必须高可用且性能佳


3、Redisson中的RedLock:

(1)为什么需要RedLock?

上述提到的RedissonLock都是在Redis单机或主从模式下使用的,这种方式会有一个缺点:当主Redis宕机之后,从Redis还未同步保存在主Redis上的锁,此时将导致锁丢失。RedLock理论上可以解决该问题,RedLock使用多Redis节点,这样可以防止单点故障。

(2)RedLock的流程(假设有5个完全独立的redis主服务器):

  1. 获取当前时间戳。
  2. client尝试按照顺序使用相同的key,value获取所有redis服务的锁,在获取锁的过程中的获取时间比锁过期时间短很多,这是为了不要过长时间等待已经关闭的redis服务。并且试着获取下一个redis实例。比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁。
  3. client通过获取所有能获取的锁后的时间减去第一步的时间,这个时间差要小于TTL时间并且至少有3个redis(N/2+1)实例成功获取锁,才算真正的获取锁成功。
  4. 如果成功获取锁,则锁的真正有效时间是 TTL减去第三步的时间差 的时间;比如:TTL 是5s,获取所有锁用了2s,则真正锁有效时间为3s(其实应该再减去时钟漂移)
  5. 如果客户端由于某些原因获取锁失败,便会开始解锁所有redis实例;因为可能已经获取了小于3个 锁,必须释放,否则影响其他client获取锁。

Q:为什么需要获取到N/2+1个节点的响应?能不能只获取N/2+1个节点的锁而不是所有的节点?


(3)RedLock的缺点?

严重依赖时钟,如果某个Redis服务出现时钟跳跃(走的比其他机器快),那么可能会出现某个Redis节点的key提前过期,这样另外一个客户端就可能再次在N/2+1个Redis节点加锁成功(多个客户端同时获取到锁)。其实对各个Redis顺序加锁时也会导致过期时间不一致。


相关文章
|
SQL 缓存 算法
香,聊聊TiDB的分布式事务模型
香,聊聊TiDB的分布式事务模型
720 0
香,聊聊TiDB的分布式事务模型
|
4月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
Linux 数据安全/隐私保护 Windows
更换(Pypi)pip源到国内镜像
pip国内的一些镜像 阿里云 http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.
247558 2
|
NoSQL
共识协议的技术变迁问题之WPaxos理常态下的IO请求处理如何解决
共识协议的技术变迁问题之WPaxos理常态下的IO请求处理如何解决
265 55
|
缓存 负载均衡 算法
HTTPS对性能的一些影响
HTTPS对性能的一些影响
713 9
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
410 0
|
安全 API 调度
异步编程中常见的问题和处理方式
【6月更文挑战第23天】在python中`asyncio` 提供PriorityQueue和LifoQueue,用于不同检索策略。异步编程需注意任务调度、错误处理和资源管理,以提高响应性和避免阻塞。
494 7
异步编程中常见的问题和处理方式
|
SQL 数据采集 监控
14个Flink SQL性能优化实践分享
本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
562 5
|
安全 Java UED
Header Location重定向机制解析与应用
Header Location重定向机制解析与应用
|
机器学习/深度学习 存储 人工智能
基于人工智能的个性化推荐系统研究
基于人工智能的个性化推荐系统研究
734 0