一文讲清楚硬核分布式锁

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 一文讲清楚硬核分布式锁


一、58集团自研WLock

1、Wlock与其他实现对比:

2、主要特性:

WLock基于WPaxos实现分布式锁服务,引入RocksDB实现锁状态持久化存储,封装TTL和续约机制以及watch异步通知功能,同时提供可视化监管平台,提供了一套完备的分布式锁实现方案;

(1)WPaxos简述:

WPaxos为58集团参照微信团队开源的PhxPaxos(C++)采用Java语言实现的分布式一致性组件,支持并行确定多个值,并将满足某种规则的请求,跳过prepare阶段,直接进入accept阶段,优化提交过程(Basic Paxos需要prepare与accept两阶段提交)。

(2)RocksDB简述:

LevelDB是由Google开源的,基于LSM Tree的单机KV数据库,其特点是高效,代码简洁而优美,Rocksdb则是Facebook基于LevelDB改造的。RocksDB 和LevelDB 是一个库,嵌入在用户的程序中,用户程序直接调用接口读写数据。相对于Redis不需要建立连接才能向他发请求,读写数据。

3、对WLock的封装后的工具类:

/**
 * @author Archi Liu
 * @version 1.0
 * @date 2021/11/10 3:20 下午
 * 分布式锁服务,当配置文件中存在wlock的配置时将创建该类实例Bean
 */
@Slf4j
@Service
@Conditional(WLockCondition.class)
public class LockService {
    /**
     * WLock的秘钥文件名(秘钥文件从WLock管理平台下载)
     */
    @Value("${wlock.key.file}")
    private String keyName;
    /**
     * 加解锁请求重试次数,底层默认重试2次,可修改该值提升性能
     */
    @Value("${wlock.retryNum:-1}")
    private Integer retryNum;
    /**
     * 若未设置过期锁时间,则使用该过期时间(30秒)
     */
    private final int defaultExpireTime = 30 * 1000;
    /**
     * 自动续期时间为过期时间的1/3
     */
    private final int defaultRenewIntervalTime = 10 * 1000;
    /**
     * 配置文件路径,需要兼容WF及SCF项目在容器环境和本地环境上的路径
     */
    private static String configPath;
    /**
     * wlock配置文件名
     */
    public static final String keyFile = "wlock.key.file";
    /**
     * 操作WLock的客户端,使用懒加载单例模式
     */
    private WLockClient wLockClient;
    /**
     * 如果是WF项目,需要调用该方法初始化WLock配置文件目录
     *
     * @param path
     */
    public static void initConfigPath(String path) {
        configPath = path;
    }
    /**
     * 获取WLock配置文件所在路径,如果项目中未配置则先检查是否为scf/scf-springboot项目容器部署环境,如果不是默认读取本地配置
     *
     * @return
     */
    private String getConfigPath() {
        if (configPath != null) {
            return configPath;
        }
        //如果是在容器环境上发布的scf/scf-springboot类型项目将会有该配置值
        configPath = System.getProperty("scf.config.location");
        if (StringUtils.isEmpty(configPath)) {
            configPath = Thread.currentThread().getContextClassLoader().getResource("").getPath();
        }
        log.info("[LockUtil] configPath:{}", configPath);
        return configPath;
    }
    /**
     * 获取单例WLockClient
     *
     * @return
     */
    private WLockClient getWLockClient() {
        if (wLockClient != null) {
            return wLockClient;
        }
        synchronized (WLockClient.class) {
            if (wLockClient != null) {
                return wLockClient;
            }
            try {
                wLockClient = new WLockClient(getConfigPath() + keyName);
                //如果设置了重试次数需要重置默认重试次数,默认重试次数为3次(注意WLock内部将首次发请求也算作一次retry)
                if (retryNum >= 0) {
                    wLockClient.setDefaultRetries(retryNum + 1);
                }
            } catch (Exception e) {
                log.error("[LockUtil] WLockClient init failed!exception:{}", ExceptionUtil.getStackTrace(e));
                throw new DistributedLockException(ResponseCodeEnum.LOCK_CLIENT_INIT_FAIL);
            }
        }
        return wLockClient;
    }
    /**
     * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。
     *
     * @param lockName 锁名称
     * @return
     */
    public boolean tryGetDistributedLock(String lockName) {
        AcquireLockResult lockResult;
        try {
            WDistributedLock wLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wLock.tryAcquireLockUnblocked(defaultExpireTime, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},renewInterval={},exception:{}",
                    lockName, defaultExpireTime, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e));
            return false;
        }
        log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param unit       锁过期时间单位
     * @return
     */
    public boolean tryGetDistributedLock(String lockName, int expireTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLockUnblocked(lockExpireTime, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},exception:{}",
                    lockName, lockExpireTime, ExceptionUtil.getStackTrace(e));
            return false;
        }
        log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。
     *
     * @param lockName
     */
    public void getDistributedLock(String lockName) {
        //锁自动续期间隔(过期时间的三分之一)
        AcquireLockResult lockResult;
        try {
            WDistributedLock wLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wLock.tryAcquireLock(defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},renewInterval={},exception:{}",
                    lockName, defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param unit       锁过期时间单位
     * @return
     */
    public void getDistributedLock(String lockName, int expireTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLock(lockExpireTime, Integer.MAX_VALUE, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}",
                    lockName, lockExpireTime, Integer.MAX_VALUE, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 使用阻塞方式尝试获取分布式锁,最多等待maxWaitTime时间,成功获取到锁之后锁将在指定过期时间之后过期
     *
     * @param lockName   锁名称
     * @param expireTime 锁过期时间
     * @param expireTime 最长等待时间
     * @param unit       锁过期时间单位
     * @return
     */
    public void getDistributedLock(String lockName, int expireTime, int maxWaitTime, TimeUnit unit) {
        //锁过期时间
        int lockExpireTime = (int) unit.toMillis(expireTime);
        //获取锁最大等待时间
        int lockMaxWaitTime = (int) unit.toMillis(maxWaitTime);
        AcquireLockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.tryAcquireLock(lockExpireTime, lockMaxWaitTime, getLockExpireListener());
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}",
                    lockName, lockExpireTime, lockMaxWaitTime, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString());
        if (!lockResult.isSuccess()) {
            //修改成获取分布式锁失败的异常
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL);
        }
    }
    /**
     * 释放分布式锁,若释放成功返回true,否则返回false,锁释放失败不会抛出异常
     *
     * @param lockName 锁名称
     * @return
     */
    public boolean releaseDistributedLock(String lockName) {
        LockResult lockResult;
        try {
            WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName);
            lockResult = wdLock.releaseLock();
        } catch (ParameterIllegalException e) {
            log.error("[LockUtil] releaseDistributedLock error! parameter illegal,lockName={},exception:{}", lockName, ExceptionUtil.getStackTrace(e));
            throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR);
        }
        log.info("[LockUtil] releaseDistributedLock, lockName={}, result={}", lockName, lockResult.toString());
        return lockResult.isSuccess();
    }
    /**
     * 锁续约回调通知
     *
     * @return
     */
    private RenewListener getRenewListener() {
        RenewListener renewListener = new RenewListener() {
            @Override
            public void onRenewSuccess(String s) {
                log.info("[LockUtil] renewSuccess! info={}", s);
            }
            @Override
            public void onRenewFailed(String s) {
                log.info("[LockUtil] renewFailed! info={}", s);
            }
        };
        return renewListener;
    }
    /**
     * 锁过期回调通知
     *
     * @return
     */
    private LockExpireListener getLockExpireListener() {
        LockExpireListener lockExpireListener = new LockExpireListener() {
            @Override
            public void onExpire(String s) {
                log.info("[LockUtil] lock Expired! info={}", s);
            }
        };
        return lockExpireListener;
    }
}



WLock不像Redisson提供了多种类型的锁,其只提供了WDistributedLock,但同样支持互斥锁、可重入锁、公平锁及带权重优先级锁,可通过同步阻塞或者异步非阻塞方式获取到锁。所有对分布式锁的操作都通过该对象进行,在获取锁时可以传递以下参数:

waitAcquire

是否阻塞等待获取到锁,true为阻塞,false为非阻塞

expireTime

锁过期时间,单位毫秒,默认值为5分钟,最大取值5分钟,最小值5秒

maxWaitTime

最长等待获取锁的时间,单位毫秒,最大值Long.MAX_VALUE

weight

锁权重,默认都为1,取值范围[1, 10],权重越高,获取到锁概率越高

renewInterval

自动续约间隔,单位毫秒(默认为Integer.MAX_VALUE,不自动续租,最小自动续租间隔为1000ms,最大自动续租间隔不能超过过期时间,由业务控制)。

renewListener

续约Listener回调

lockExpireListener

锁过期Listener回调

watchListener

异步监听事件回调

死锁问题补充:

不管是Redisson还是WLock都使用了客户端定时续约的方式延长锁过期时间,如果处理不当将造成死锁:由于加锁和锁续约在两个线程中执行,若加锁线程在释放锁之前异常退出将导致续约线程一直执行续约操作,造成死锁,此时只能使用重启进程的方式进行锁释放。所以业务在加锁处理逻辑的上层一定添加try catch 异常获,在finally逻辑中释放锁。加解锁操作参照阿里开发规范:



正例:
  Lock lock = new XxxLock();
  // ...
  lock.lock();
  try {
      doSomething();
      doOthers();
  } finally {
      lock.unlock();
  }
反例:
  Lock lock = new XxxLock();
  // ...
  try {
      // 如果此处抛出异常,则直接执行 finally 代码块
      doSomething();
      // 无论加锁是否成功,finally 代码块都会执行
      lock.lock();
      doOthers();
  } finally {
      lock.unlock();
  }



二、单机锁

1、在Java中每个对象都有一把锁,如普通的Object对象及类的Class对象。线程可以使用synchronized关键字来获取对象上的锁。synchronized关键字可以应用在方法级别(粗粒度)或代码块级别(细粒度),在JDK1.6以前,使用synchronized只有一种方式即重量级锁,而在JDK1.6以后,引入了偏向锁,轻量级锁,重量级锁,来减少竞争带来的上下文切换。

2、JUC包下提供了如下的锁:

ReentrantLock

可重入锁,提供公平与非公平的方式

ReentrantReadWriteLock

可重入读写锁,写锁类似于可重入锁,读写互斥

StampedLock

邮戳锁,优化读写锁会造成写线程饥饿问题,提供了乐观读方式,读写不互斥

三、分布式锁

1、为什么我们需要分布式锁?

单机锁主要是为了同步同一进程中各个线程之间的操作。大多数互联网系统都是分布式部署的,当某个资源在多系统之间具有共享性的时候,为了保证大家访问这个资源数据是一致的,那么就必须要求在同一时刻只能被一个客户端处理,不能并发的执行,否者就会出现同一时刻有人写有人读,大家访问到的数据就不一致了。分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。

2、分布式锁需要具备的条件?

排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取

避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)

高可用:获取或释放锁的机制必须高可用且性能佳


3、Redisson中的RedLock:

(1)为什么需要RedLock?

上述提到的RedissonLock都是在Redis单机或主从模式下使用的,这种方式会有一个缺点:当主Redis宕机之后,从Redis还未同步保存在主Redis上的锁,此时将导致锁丢失。RedLock理论上可以解决该问题,RedLock使用多Redis节点,这样可以防止单点故障。

(2)RedLock的流程(假设有5个完全独立的redis主服务器):

  1. 获取当前时间戳。
  2. client尝试按照顺序使用相同的key,value获取所有redis服务的锁,在获取锁的过程中的获取时间比锁过期时间短很多,这是为了不要过长时间等待已经关闭的redis服务。并且试着获取下一个redis实例。比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁。
  3. client通过获取所有能获取的锁后的时间减去第一步的时间,这个时间差要小于TTL时间并且至少有3个redis(N/2+1)实例成功获取锁,才算真正的获取锁成功。
  4. 如果成功获取锁,则锁的真正有效时间是 TTL减去第三步的时间差 的时间;比如:TTL 是5s,获取所有锁用了2s,则真正锁有效时间为3s(其实应该再减去时钟漂移)
  5. 如果客户端由于某些原因获取锁失败,便会开始解锁所有redis实例;因为可能已经获取了小于3个 锁,必须释放,否则影响其他client获取锁。

Q:为什么需要获取到N/2+1个节点的响应?能不能只获取N/2+1个节点的锁而不是所有的节点?


(3)RedLock的缺点?

严重依赖时钟,如果某个Redis服务出现时钟跳跃(走的比其他机器快),那么可能会出现某个Redis节点的key提前过期,这样另外一个客户端就可能再次在N/2+1个Redis节点加锁成功(多个客户端同时获取到锁)。其实对各个Redis顺序加锁时也会导致过期时间不一致。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
缓存 NoSQL 算法
认真学习分布式应用中的分布式锁
认真学习分布式应用中的分布式锁
70 0
|
3月前
|
消息中间件 缓存 负载均衡
这些年背过的面试题——分布式篇
分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。
|
3月前
|
缓存 安全 Java
一文吃透面试线程必问10大问题
本文全面探讨了Java线程的十个关键面试问题,涵盖了线程的基本概念、创建方法、使用目的与好处、运行流程与状态、停止线程的正确方式、以及线程安全等高级主题。
|
6月前
|
存储 NoSQL 关系型数据库
聊一聊分布式锁的设计模型
本文介绍了分布式锁的设计模型、运行原理以及具体用法,作者也在文中体现了自己的关于分布式锁的思考以及具体实践。
52858 0
|
6月前
|
消息中间件 缓存 安全
清华架构大牛剖析高并发与多线程的关系、区别,带你击穿面试难题
当提起这两个词的时候,是不是很多人都认为高并发=多线程? 当面试官问到高并发系统可以采用哪些手段来解决,是不是一脸懵逼?
全到哭!从面试到架构,阿里大佬用五部分就把高并发编程讲清楚了
不知道大家最近去面试过没有?有去面试过的小伙伴应该会知道现在互联网企业招聘对于“高并发”这块的考察可以说是越来越注重了。基本上你简历上有高并发相关经验,就能成为企业优先考虑的候选人。其原因在于,企业真正需要的是能独立解决问题的人才。每年面试找工作的人很多,技术水平也是高低不一,而并发编程却一直是让大家很头疼的事情,很多人总觉得自己似乎掌握了并发编程的知识,但实际在面试或者工作中,都会被它吊打虐哭。
137 0
|
NoSQL 安全 Redis
分布式锁原理没搞懂,错失大厂offer
分布式锁原理没搞懂,错失大厂offer
62 0
直击灵魂!美团大牛手撸并发原理笔记,由浅入深剖析JDK源码
并发编程这四个字想必大家最近都在网上看到过有很多的帖子在讨论。我们都知道并发编程可选择的方式有多进程、多线程和多协程。在Java中,并发就是多线程模式。而多线程编程也一直是一个被广泛而深入讨论的领域。如果遇到复杂的多线程编程场景,大多数情况下我们就需要站在巨人的肩膀上利用并发编程框架——JDK Concurrent包来解决相关线程问题。
面试官:小伙子我们先来唠唠并发编程的几大核心知识点
并发编程算是Java的一个难点,经常做业务相关的程序员基本上用不到juc的包,但是这些知识点十分重要,所以不管在哪里,时刻保持学习真的很重要。
|
消息中间件 算法 JavaScript
面试官:谈谈分布式一致性机制,我一脸懵逼。。
面试官:谈谈分布式一致性机制,我一脸懵逼。。
下一篇
无影云桌面