一、58集团自研WLock
1、Wlock与其他实现对比:
2、主要特性:
WLock基于WPaxos实现分布式锁服务,引入RocksDB实现锁状态持久化存储,封装TTL和续约机制以及watch异步通知功能,同时提供可视化监管平台,提供了一套完备的分布式锁实现方案;
(1)WPaxos简述:
WPaxos为58集团参照微信团队开源的PhxPaxos(C++)采用Java语言实现的分布式一致性组件,支持并行确定多个值,并将满足某种规则的请求,跳过prepare阶段,直接进入accept阶段,优化提交过程(Basic Paxos需要prepare与accept两阶段提交)。
(2)RocksDB简述:
LevelDB是由Google开源的,基于LSM Tree的单机KV数据库,其特点是高效,代码简洁而优美,Rocksdb则是Facebook基于LevelDB改造的。RocksDB 和LevelDB 是一个库,嵌入在用户的程序中,用户程序直接调用接口读写数据。相对于Redis不需要建立连接才能向他发请求,读写数据。
3、对WLock的封装后的工具类:
/** * @author Archi Liu * @version 1.0 * @date 2021/11/10 3:20 下午 * 分布式锁服务,当配置文件中存在wlock的配置时将创建该类实例Bean */ @Slf4j @Service @Conditional(WLockCondition.class) public class LockService { /** * WLock的秘钥文件名(秘钥文件从WLock管理平台下载) */ @Value("${wlock.key.file}") private String keyName; /** * 加解锁请求重试次数,底层默认重试2次,可修改该值提升性能 */ @Value("${wlock.retryNum:-1}") private Integer retryNum; /** * 若未设置过期锁时间,则使用该过期时间(30秒) */ private final int defaultExpireTime = 30 * 1000; /** * 自动续期时间为过期时间的1/3 */ private final int defaultRenewIntervalTime = 10 * 1000; /** * 配置文件路径,需要兼容WF及SCF项目在容器环境和本地环境上的路径 */ private static String configPath; /** * wlock配置文件名 */ public static final String keyFile = "wlock.key.file"; /** * 操作WLock的客户端,使用懒加载单例模式 */ private WLockClient wLockClient; /** * 如果是WF项目,需要调用该方法初始化WLock配置文件目录 * * @param path */ public static void initConfigPath(String path) { configPath = path; } /** * 获取WLock配置文件所在路径,如果项目中未配置则先检查是否为scf/scf-springboot项目容器部署环境,如果不是默认读取本地配置 * * @return */ private String getConfigPath() { if (configPath != null) { return configPath; } //如果是在容器环境上发布的scf/scf-springboot类型项目将会有该配置值 configPath = System.getProperty("scf.config.location"); if (StringUtils.isEmpty(configPath)) { configPath = Thread.currentThread().getContextClassLoader().getResource("").getPath(); } log.info("[LockUtil] configPath:{}", configPath); return configPath; } /** * 获取单例WLockClient * * @return */ private WLockClient getWLockClient() { if (wLockClient != null) { return wLockClient; } synchronized (WLockClient.class) { if (wLockClient != null) { return wLockClient; } try { wLockClient = new WLockClient(getConfigPath() + keyName); //如果设置了重试次数需要重置默认重试次数,默认重试次数为3次(注意WLock内部将首次发请求也算作一次retry) if (retryNum >= 0) { wLockClient.setDefaultRetries(retryNum + 1); } } catch (Exception e) { log.error("[LockUtil] WLockClient init failed!exception:{}", ExceptionUtil.getStackTrace(e)); throw new DistributedLockException(ResponseCodeEnum.LOCK_CLIENT_INIT_FAIL); } } return wLockClient; } /** * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。 * * @param lockName 锁名称 * @return */ public boolean tryGetDistributedLock(String lockName) { AcquireLockResult lockResult; try { WDistributedLock wLock = getWLockClient().newDistributeLock(lockName); lockResult = wLock.tryAcquireLockUnblocked(defaultExpireTime, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener()); } catch (ParameterIllegalException e) { log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},renewInterval={},exception:{}", lockName, defaultExpireTime, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e)); return false; } log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString()); return lockResult.isSuccess(); } /** * 使用非阻塞方式尝试获取分布式锁,若获取成功返回true,否则返回false。成功获取到锁之后锁将在指定过期时间之后过期 * * @param lockName 锁名称 * @param expireTime 锁过期时间 * @param unit 锁过期时间单位 * @return */ public boolean tryGetDistributedLock(String lockName, int expireTime, TimeUnit unit) { //锁过期时间 int lockExpireTime = (int) unit.toMillis(expireTime); AcquireLockResult lockResult; try { WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName); lockResult = wdLock.tryAcquireLockUnblocked(lockExpireTime, getLockExpireListener()); } catch (ParameterIllegalException e) { log.error("[LockUtil] tryGetDistributedLock error! parameter illegal, lockName={},lockExpireTime={},exception:{}", lockName, lockExpireTime, ExceptionUtil.getStackTrace(e)); return false; } log.info("[LockUtil] tryGetDistributedLock lockName={},result={}", lockName, lockResult.toString()); return lockResult.isSuccess(); } /** * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,初始锁过期时间为30秒,成功获取到锁之后将自动进行锁续期。 * * @param lockName */ public void getDistributedLock(String lockName) { //锁自动续期间隔(过期时间的三分之一) AcquireLockResult lockResult; try { WDistributedLock wLock = getWLockClient().newDistributeLock(lockName); lockResult = wLock.tryAcquireLock(defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, getRenewListener(), getLockExpireListener()); } catch (ParameterIllegalException e) { log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},renewInterval={},exception:{}", lockName, defaultExpireTime, Integer.MAX_VALUE, defaultRenewIntervalTime, ExceptionUtil.getStackTrace(e)); throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR); } log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString()); if (!lockResult.isSuccess()) { throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL); } } /** * 使用阻塞方式尝试获取分布式锁,若未获取到将一直阻塞等待,成功获取到锁之后锁将在指定过期时间之后过期 * * @param lockName 锁名称 * @param expireTime 锁过期时间 * @param unit 锁过期时间单位 * @return */ public void getDistributedLock(String lockName, int expireTime, TimeUnit unit) { //锁过期时间 int lockExpireTime = (int) unit.toMillis(expireTime); AcquireLockResult lockResult; try { WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName); lockResult = wdLock.tryAcquireLock(lockExpireTime, Integer.MAX_VALUE, getLockExpireListener()); } catch (ParameterIllegalException e) { log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}", lockName, lockExpireTime, Integer.MAX_VALUE, ExceptionUtil.getStackTrace(e)); throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR); } log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString()); if (!lockResult.isSuccess()) { throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL); } } /** * 使用阻塞方式尝试获取分布式锁,最多等待maxWaitTime时间,成功获取到锁之后锁将在指定过期时间之后过期 * * @param lockName 锁名称 * @param expireTime 锁过期时间 * @param expireTime 最长等待时间 * @param unit 锁过期时间单位 * @return */ public void getDistributedLock(String lockName, int expireTime, int maxWaitTime, TimeUnit unit) { //锁过期时间 int lockExpireTime = (int) unit.toMillis(expireTime); //获取锁最大等待时间 int lockMaxWaitTime = (int) unit.toMillis(maxWaitTime); AcquireLockResult lockResult; try { WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName); lockResult = wdLock.tryAcquireLock(lockExpireTime, lockMaxWaitTime, getLockExpireListener()); } catch (ParameterIllegalException e) { log.error("[LockUtil] getDistributedLock error! parameter illegal, lockName={},lockExpireTime={},lockMaxWaitTime={},exception:{}", lockName, lockExpireTime, lockMaxWaitTime, ExceptionUtil.getStackTrace(e)); throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR); } log.info("[LockUtil] getDistributedLock lockName={},lockResult={}", lockName, lockResult.toString()); if (!lockResult.isSuccess()) { //修改成获取分布式锁失败的异常 throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_FAIL); } } /** * 释放分布式锁,若释放成功返回true,否则返回false,锁释放失败不会抛出异常 * * @param lockName 锁名称 * @return */ public boolean releaseDistributedLock(String lockName) { LockResult lockResult; try { WDistributedLock wdLock = getWLockClient().newDistributeLock(lockName); lockResult = wdLock.releaseLock(); } catch (ParameterIllegalException e) { log.error("[LockUtil] releaseDistributedLock error! parameter illegal,lockName={},exception:{}", lockName, ExceptionUtil.getStackTrace(e)); throw new DistributedLockException(ResponseCodeEnum.GET_LOCK_PARAM_ERROR); } log.info("[LockUtil] releaseDistributedLock, lockName={}, result={}", lockName, lockResult.toString()); return lockResult.isSuccess(); } /** * 锁续约回调通知 * * @return */ private RenewListener getRenewListener() { RenewListener renewListener = new RenewListener() { @Override public void onRenewSuccess(String s) { log.info("[LockUtil] renewSuccess! info={}", s); } @Override public void onRenewFailed(String s) { log.info("[LockUtil] renewFailed! info={}", s); } }; return renewListener; } /** * 锁过期回调通知 * * @return */ private LockExpireListener getLockExpireListener() { LockExpireListener lockExpireListener = new LockExpireListener() { @Override public void onExpire(String s) { log.info("[LockUtil] lock Expired! info={}", s); } }; return lockExpireListener; } }
WLock不像Redisson提供了多种类型的锁,其只提供了WDistributedLock,但同样支持互斥锁、可重入锁、公平锁及带权重优先级锁,可通过同步阻塞或者异步非阻塞方式获取到锁。所有对分布式锁的操作都通过该对象进行,在获取锁时可以传递以下参数:
waitAcquire |
是否阻塞等待获取到锁,true为阻塞,false为非阻塞 |
expireTime |
锁过期时间,单位毫秒,默认值为5分钟,最大取值5分钟,最小值5秒 |
maxWaitTime |
最长等待获取锁的时间,单位毫秒,最大值Long.MAX_VALUE |
weight |
锁权重,默认都为1,取值范围[1, 10],权重越高,获取到锁概率越高 |
renewInterval |
自动续约间隔,单位毫秒(默认为Integer.MAX_VALUE,不自动续租,最小自动续租间隔为1000ms,最大自动续租间隔不能超过过期时间,由业务控制)。 |
renewListener |
续约Listener回调 |
lockExpireListener |
锁过期Listener回调 |
watchListener |
异步监听事件回调 |
死锁问题补充:
不管是Redisson还是WLock都使用了客户端定时续约的方式延长锁过期时间,如果处理不当将造成死锁:由于加锁和锁续约在两个线程中执行,若加锁线程在释放锁之前异常退出将导致续约线程一直执行续约操作,造成死锁,此时只能使用重启进程的方式进行锁释放。所以业务在加锁处理逻辑的上层一定添加try catch 异常获,在finally逻辑中释放锁。加解锁操作参照阿里开发规范:
正例: Lock lock = new XxxLock(); // ... lock.lock(); try { doSomething(); doOthers(); } finally { lock.unlock(); } 反例: Lock lock = new XxxLock(); // ... try { // 如果此处抛出异常,则直接执行 finally 代码块 doSomething(); // 无论加锁是否成功,finally 代码块都会执行 lock.lock(); doOthers(); } finally { lock.unlock(); }
二、单机锁
1、在Java中每个对象都有一把锁,如普通的Object对象及类的Class对象。线程可以使用synchronized关键字来获取对象上的锁。synchronized关键字可以应用在方法级别(粗粒度)或代码块级别(细粒度),在JDK1.6以前,使用synchronized只有一种方式即重量级锁,而在JDK1.6以后,引入了偏向锁,轻量级锁,重量级锁,来减少竞争带来的上下文切换。
2、JUC包下提供了如下的锁:
ReentrantLock |
可重入锁,提供公平与非公平的方式 |
ReentrantReadWriteLock |
可重入读写锁,写锁类似于可重入锁,读写互斥 |
StampedLock |
邮戳锁,优化读写锁会造成写线程饥饿问题,提供了乐观读方式,读写不互斥 |
三、分布式锁
1、为什么我们需要分布式锁?
单机锁主要是为了同步同一进程中各个线程之间的操作。大多数互联网系统都是分布式部署的,当某个资源在多系统之间具有共享性的时候,为了保证大家访问这个资源数据是一致的,那么就必须要求在同一时刻只能被一个客户端处理,不能并发的执行,否者就会出现同一时刻有人写有人读,大家访问到的数据就不一致了。分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。
2、分布式锁需要具备的条件?
排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取
避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)
高可用:获取或释放锁的机制必须高可用且性能佳
3、Redisson中的RedLock:
(1)为什么需要RedLock?
上述提到的RedissonLock都是在Redis单机或主从模式下使用的,这种方式会有一个缺点:当主Redis宕机之后,从Redis还未同步保存在主Redis上的锁,此时将导致锁丢失。RedLock理论上可以解决该问题,RedLock使用多Redis节点,这样可以防止单点故障。
(2)RedLock的流程(假设有5个完全独立的redis主服务器):
- 获取当前时间戳。
- client尝试按照顺序使用相同的key,value获取所有redis服务的锁,在获取锁的过程中的获取时间比锁过期时间短很多,这是为了不要过长时间等待已经关闭的redis服务。并且试着获取下一个redis实例。比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁。
- client通过获取所有能获取的锁后的时间减去第一步的时间,这个时间差要小于TTL时间并且至少有3个redis(N/2+1)实例成功获取锁,才算真正的获取锁成功。
- 如果成功获取锁,则锁的真正有效时间是 TTL减去第三步的时间差 的时间;比如:TTL 是5s,获取所有锁用了2s,则真正锁有效时间为3s(其实应该再减去时钟漂移)
- 如果客户端由于某些原因获取锁失败,便会开始解锁所有redis实例;因为可能已经获取了小于3个 锁,必须释放,否则影响其他client获取锁。
Q:为什么需要获取到N/2+1个节点的响应?能不能只获取N/2+1个节点的锁而不是所有的节点?
(3)RedLock的缺点?
严重依赖时钟,如果某个Redis服务出现时钟跳跃(走的比其他机器快),那么可能会出现某个Redis节点的key提前过期,这样另外一个客户端就可能再次在N/2+1个Redis节点加锁成功(多个客户端同时获取到锁)。其实对各个Redis顺序加锁时也会导致过期时间不一致。