前文如上:
39.【面试宝典】面试宝典-redis过期k值回收策略,缓存淘汰策略
合集参考:面试宝典
书接上回,上次复习了redis分布式锁,今天复习一下两外两种实现:基于数据库的实现方式和基于缓存(Redis等)实现分布式锁
合集参考: 面试宝典
文档参考:
分布式锁
1.背景
在日常开发中,单体应用都在同一个JVM环境中。一个JVM进程中当有多个线程的去竞争某一个资源的时候,我们通常会用一把锁来保证只有一个线程获取到资源。如:synchronize关键字或ReentrantLock锁等操作。
但是,随着业务的增长,单体应用存在性能瓶颈。随着微服务架构的成熟及普及,单体应用可能会拆分成多个微服务应用。
当多个应用服务同时对同一条数据做修改,JVM层面的锁就不适用了。分布式高并发场景下,如何要确保数据的正确性,如何保证只有一个应用能够修改成功?这里就诞生了分布式场景下的锁,即分布式锁。
2.分布式锁应该具备哪些条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
- 高可用、高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁失效机制、防止死锁;
- 具备非阻塞锁特性,即没有获取到锁直接返回获取锁失败;
3.实现方案
分布式锁三种实现方式:
- 基于数据库实现分布式锁;
- 基于缓存(Redis等)实现分布式锁;
- 基于Zookeeper实现分布式锁。
三种方案的比较
- 从理解的难易程度(从低到高):数据库 > 缓存 > Zookeeper
- 从实现的复杂性角度(从低到高):Zookeeper >= 缓存 > 数据库
- 从性能角度(从高到低):缓存 > Zookeeper >= 数据库
- 从可靠性角度(从高到低):Zookeeper > 缓存 > 数据库 基于数据库实现分布式锁
4.基于数据库的实现方式
4.1 原理
在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
4.2 实现
1.创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引
DROP TABLE IF EXISTS `method_lock`; CREATE TABLE `method_lock` ( `id` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键', `method_name` VARCHAR(64) NOT NULL COMMENT '锁定的方法名', `desc` VARCHAR(255) NOT NULL COMMENT '备注信息', `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE ) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = '锁定中的方法';
- 想要执行某个方法,就使用这个方法名向表中插入数据
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
由于我们对method_name做了唯一性约束,如果有多个请求同时提交插入操作时,数据库能确保只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体中的内容。
3、执行完成后,删除对应的行数据释放锁
delete from method_lock where method_name ='methodName';
4.3 问题及优化
这里只是基于数据库实现的一种方法(比较粗的一种)。 但是对于分布式锁应该具备的条件来说,还有一些问题需要解决及优化:
- 可用性: 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能。(优化:数据库需要双机部署、数据同步、主备切换;)
- 可重入性: 它不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据。(优化:在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器线程相同,若相同则直接获取锁。)
- 锁失效机制:这把锁没有失效时间,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,(优化:在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据)
- 非阻塞:这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。 (优化:所以需要优化获取逻辑,循环多次去获取)
- 性能:依赖数据库需要一定的资源开销,性能问题需要考虑;
4.4基于数据库的排它锁
除了可以通过增删操作数据表中的记录以外,其实还可以借助数据库中自带的锁来实现分布式的锁。
我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:
public void unlock(){ connection.commit(); }
通过connection.commit()操作来释放锁。 这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。阻塞锁:for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。 锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。 但是还是无法直接解决数据库单点和可重入问题。
乐观锁
乐观锁假设认为数据一般情况下不会造成冲突,只有在进行数据的提交更新时,才会检测数据的冲突情况,如果发现冲突了,则返回错误信息
实现方式:
时间戳(timestamp)记录机制实现:给数据库表增加一个时间戳字段类型的字段,当读取数据时,将timestamp字段的值一同读出,数据每更新一次,timestamp也同步更新。当对数据做提交更新操作时,检查当前数据库中数据的时间戳和自己更新前取到的时间戳进行对比,若相等,则更新,否则认为是失效数据。 若出现更新冲突,则需要上层逻辑修改,启动重试机制 同样也可以使用version的方式。
性能对比
(1) 悲观锁实现方式是独占数据,其它线程需要等待,不会出现修改的冲突,能够保证数据的一致性,但是依赖数据库的实现,且在线程较多时出现等待造成效率降低的问题。一般情况下,对于数据很敏感且读取频率较低的场景,可以采用悲观锁的方式
(2) 乐观锁可以多线程同时读取数据,若出现冲突,也可以依赖上层逻辑修改,能够保证高并发下的读取,适用于读取频率很高而修改频率较少的场景
(3) 由于库存回写数据属于敏感数据且读取频率适中,所以建议使用悲观锁优化
4.5 优缺点
数据库实现分布式锁的优点: 直接借助数据库,容易理解。
数据库实现分布式锁的缺点: 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。
操作数据库需要一定的开销,性能问题需要考虑。
5.基于zookeeper的实现方式
5.1原理
基于Zookeeper临时有序节点同样可以实现分布式锁。
大致思想为:每个客户端对某个方法加锁时,在zookeeper上的该方法对应的指定节点目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。如果获取到比自己小的兄弟节点不存在,则说明当前线程顺序号最小,获得锁。 如果判断自己不是那最小的一个节点,则设置监听比自己次小的节点; 如果已处理完成,则删除自己的节点。同时,其可以避免服务宕机导 致的锁无法释放,而产生的死锁问题。
5.2实现
临时顺序节点实现分布式锁
在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面,会加上一个次序编号,而这个生成的次序编号,是上一个生成的次序编号加一。
例如,有一个用于发号的节点“/test/lock”为父亲节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,假定相同的前缀为“/test/lock/seq-”。第一个创建的子节点基本上应该为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推。
- 一个ZooKeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程,都在这个节点下创建个临时顺序节点。由于ZK节点,是按照创建的次序,依次递增的。
- 为了确保公平,可以简单的规定:编号最小的那个节点,表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁,也就是谁先创建的节点,谁获取锁。
- ZooKeeper的节点监听机制,可以保障占有锁的传递有序而且高效。 每个线程抢占锁之前,先尝试创建自己的ZNode。同样,释放锁的时候,就需要删除创建的Znode。创建成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode的通知就可以了。前一个Znode删除的时候,会触发Znode事件,当前节点能监听到删除事件,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
- ZooKeeper的节点监听机制,能够非常完美地实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听(linsten)或者监视(watch)排号在自己前面那个,而且紧挨在自己前面的那个节点,就能收到其删除事件了。
只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。- ZooKeeper的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode锁的客户端与ZooKeeper集群服务器失去联系,这个临时Znode也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。正是由于这个原因,在创建取号节点的时候,尽量创建临时znode节点。
原理如图:
5.3. 使用Curator实现分布式锁
Curator是Netfix公司开源的一套ZooKeeper客户端框架,对zk底层的连接、监听等进行了良好的封装,并且还提供了分布式锁API,因此我们不必自己实现上述复杂的理论,直接使用curator框架即可。
Curator分布式锁是一种可重入锁,实现了分布式的AQS,使用ConcurrentMap实现了一个类似ThreadLocal的功能,把线程(Thread.currentThread())作为key,锁作为value,在加锁时,如果在key中找到该线程,就对value加1,解锁时减1,减到0的时候删除临时节点、移除map中的该线程。
首先定义锁接口:
package utils.distributed.lock; public interface DistributedLock { /** * 阻塞式锁 * @return */ void lock(); /** * 非阻塞式锁 * @return */ boolean tryLock(); /** * 带超时时间的阻塞式锁 * @param timeout * @return */ boolean tryLock(long timeout); /** * 解锁 */ void unLock() throws Exception; /** * 释放资源 */ void shutdown(); }
然后实现一个zk锁:
package utils.distributed.lock; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * zk分布式锁 */ public class ZKLock implements DistributedLock { private CuratorFramework client; private InterProcessMutex lock; public ZKLock(String host, String bizType, String lockKey) { client = CuratorFrameworkFactory.newClient(host, new ExponentialBackoffRetry(ZKLockConstant.BASE_SLEEP_TIME_MS, ZKLockConstant.MAX_RETRIES)); client.start(); String path = ZKLockConstant.ZK_SEPERATOR + StringUtils.join(Arrays.asList(ZKLockConstant.ZK_LOCK_BASE_PREFIX, bizType, lockKey), ZKLockConstant.ZK_SEPERATOR); lock = new InterProcessMutex(client, path); } public void lock() { try { lock.acquire(); } catch (Exception e) { e.printStackTrace(); } } public boolean tryLock() { return tryLock(0); } public boolean tryLock(long timeout) { try { return lock.acquire(timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); return false; } } public void unLock() throws Exception { lock.release(); } public void shutdown() { client.close(); } }
5.4. 测试
1 测试lock
package mytest.distributed.lock; import utils.distributed.lock.ZKLock; import java.util.concurrent.CountDownLatch; public class ZkLockTest { public static void main(String[] args) { String zkHost = "192.168.160.128:2181"; String bizType = "test"; String lockKey = "testZkLock"; ZKLock zkLock = new ZKLock(zkHost, bizType, lockKey); // 启动3个线程模拟分布式锁竞争 CountDownLatch conutDownLatch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { new Thread(() -> { testZkLock(zkLock); conutDownLatch.countDown(); }).start(); } try { conutDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } zkLock.shutdown(); } private static void testZkLock(ZKLock zkLock) { System.out.println("######## 开始加锁,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); zkLock.lock(); try { System.out.println("######## 加锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { try { zkLock.unLock(); System.out.println("######## 解锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } catch (Exception e) { System.out.println("######## 解锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } } } }
打印结果:
全部成功了。
2 测试tryLock()
复用上面的main方法,将testZkLock换成testZkLockTryLock:
private static void testZkLockTryLock(ZKLock zkLock) { System.out.println("######## 开始加锁,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); if (!zkLock.tryLock()) { System.out.println("######## 加锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); return; } try { System.out.println("######## 加锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } finally { try { zkLock.unLock(); System.out.println("######## 解锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } catch (Exception e) { System.out.println("######## 解锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } } }
打印结果:
因为只尝试一次,所以有失败的情况发生。
3 测试tryLock(timeout)
private static void testZkLockTryLockTimeOut(ZKLock zkLock) { System.out.println("######## 开始加锁,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); if (!zkLock.tryLock(3000)) { System.out.println("######## 加锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); return; } try { System.out.println("######## 加锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { try { zkLock.unLock(); System.out.println("######## 解锁成功,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } catch (Exception e) { System.out.println("######## 解锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); } } }
打印结果:
线程18获取锁之后要持有5s才会释放,而线程16、17加锁等待时间只有3s,因此会超时。
5.5.分布式锁注解
实现一个基于注解的分布式锁:
1 定义分布式锁注解
package utils.distributed.annotation; import java.lang.annotation.*; /** * ZK分布式锁注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ZkLock { /** * zk地址 * @return */ String zkHost(); /** * 业务类型 */ String bizType(); /** * 锁名称 * * @return */ String lockKey(); /** * 超时时间 * @return */ long timeout(); } 复制代码
4.2 定义分布式锁切面
package utils.distributed.advice; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import utils.distributed.annotation.ZkLock; import utils.distributed.lock.ZKLock; /** * @Description zk分布式锁切面 * @Author lilong * @Date 2019-04-08 13:50 */ @Component @Aspect public class ZkLockAspectAdvice { @Around(value = "@annotation(utils.distributed.annotation.ZkLock) && @annotation(zkLock)") public Object process(ProceedingJoinPoint pjp, ZkLock zkLock) throws Throwable { ZKLock lock = new ZKLock(zkLock.zkHost(), zkLock.bizType(), zkLock.lockKey()); boolean acquired = false; try { acquired = lock.tryLock(zkLock.timeout()); if (acquired) { return pjp.proceed(); } else { System.out.println("######## 加锁失败,线程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName()); return null; } } finally { if (acquired) { lock.unLock(); } } } }
2 使用注解
@Override @ZkLock(zkHost = "192.168.160.128:2181", bizType = "test", lockKey = "queryKeyValue", timeout = 3000) public KeyValueJsonPO queryKeyValue(String bizType, String key) { return keyValueJsonPOMapper.queryKeyValue(bizType, key); }
5.6 问题
- 锁失效机制:使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉( Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
- 非阻塞:使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
- 可重入性:使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的 时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
- 可用性:使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。 可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
5.7 优缺点
优点
具备高可用、可重入、阻塞锁特性、可解决失效死锁问题。实现起来较为简单
缺点
Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。需要对ZK的原理有所了解。
公众号,感谢关注