分布式锁-zookeeper-lock

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

第一次写博客,写的不对的或者不明了的请大家多多指正14.gif

===object pool 介绍====

主要类、接口介绍:

接口 PooledObjectFactory 池对象工厂

用来创建池对象, 

将不用的池对象进行钝化(passivateObject), 

对要使用的池对象进行激活(activeObject), 

对池对象进行验证(validateObject), 

对有问题的池对象进行销毁(destroyObject)等工作

PooledObject<T> makeObject(); 生成对象

void activateObject(PooledObject<T> p); 激活对象

void destroyObject(PooledObject<T> p); 销毁对象

......

抽象类 BasePooledObjectFactory<T> implements PooledObjectFactory<T>

接口 ObjectPool : 对象池

T borrowObject(); 从池中借出一个对象

void returnObject(T obj); 归还对象

......

GenericObjectPool 创建对象池

实现的三个接口: ObjectPool 、 GenericObjectPoolMXBean 、 UsageTracking


主要问题解析:

如何实现公平锁:

pool底层维护了一个双端队列LinkedBlockingDeque

该双端队列使用ReentrantLock实现了可重入锁

因此底层可以修改ReentrantLock的公平和非公平机制实现pool的公平锁

初始化双端队列时可以设置GenericObjectPoolConfig.fairness为true来实现公平锁,默认为false(非公平锁)

双端队列LinkedBlockingDeque的长度默认为Integer.MAX_VALUE

borrowObject()怎么实现的:

从LinkedBlockingDeque双端队列中取第一个对象LinkedBlockingDeque.pollFirst(),

如果没有取到-->调用指定的factory.makeObject()-->factory.activateObject(p)-->return 

如果取到了-->factory.activateObject(p)-->return

returnObject(T obj)怎么实现的:

1:锁定对象,验证对象的状态

2:factory.passivateObject(p) 钝化对象(空实现)

3:p.deallocate() 释放对象

4:如果对象池closed-->调用指定的factory.destroyObject(p);

  如果对象池没有closed-->LinkedBlockingDeque.addFirst(p)


====org.apache.curator.framework包简单介绍====

主要类、接口介绍:

接口 CuratorFramework ZooKeeper客户端开源工具,主要提供了对客户端到服务的连接管理和连接重试机制,以及一些扩展功能

void start();

void close();

CuratorFrameworkState getState();

boolean isStarted();

CreateBuilder create();

......

类 ExponentialBackoffRetry 创建目录或者删除目录的重试策略

类 LockInternals 是所有申请锁与释放锁的核心实现

String attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径

boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) 循环等待尝试加锁

void releaseLock(String lockPath)

void deleteOurPath(String ourPath)

类 StandardLockInternalsDriver driver

String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) 创建节点

ourPath = client.create().creatingParentContainersIfNeeded() 

.withProtection()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(path, lockNodeBytes) 创建临时顺序节点路径

类 CreateBuilderImpl client.create()方法返回的客户端实现类实例

String forPath(final String givenPath, byte[] data)

String pathInForeground(final String path, final byte[] data)

(生成目录!!)createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode)

类 InterProcessMutex 互斥锁 一个可重入锁,提供分布式锁的入口服务

锁的定义:ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 将线程对象和锁对象(线程、路径、锁的数量)关联

void acquire() 无限等待,直到获取到锁(默认time=-1,unit=null)

boolean acquire(long time, TimeUnit unit) 有限等待,在规定的时间内获取锁

1:boolean internalLock(long time, TimeUnit unit)

当前线程在缓存中是否已经存在一把锁,如果存在,将锁的次数加1直接返回

如果不存在,执行第2步

2:(主要方法!!)String LockInternals.attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径

StandardLockInternalsDriver.createsTheLock(client, path, localLockNodeBytes) 创建节点

client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

在指定的节点路径下,顺序创建子节点(按照请求顺序)

注意:对每个请求都会创建一个节点

LockInternals.internalLockLoop(startMillis, millisToWait, ourPath) 循环等待尝试加锁

a、根据basePath获取子节点,按照创建时间升序排列(最先申请的在最前)

b、查看子节点中是否包含传入的节点,并且下标是否为0(acquire区分maxLeases)

如果下标为0,直接返回锁定成功,否则进行c步骤

c、启动一个自定义watcher后台监听传入的节点的状态,一旦监听到节点数据变更或删除,则就直接 notifyFromWatcher();

同时根据millisToWait(acquire方法的不同参数区分)判断是否wait

d、如果出现异常,删除节点路径

void release() 判断是否是当前线程,或者非当前线程。最终会根据线程号找到对应的path路径,然后直接删除该临时节点

LockInternals.releaseLock(String lockPath)

LockInternals.deleteOurPath(String ourPath)

client.delete().guaranteed().forPath(ourPath);

扩展:

方法 Thread.currentThread()

方法 AtomicInteger lockCount.incrementAndGet();

类 org.apache.curator.RetryLoop 


=====具体实现代码(伪代码)======

=====1、初始化对象池==========

@Component("curatorPoolFactory")
public class CuratorPoolFactory extends BasePooledObjectFactory<CuratorFramework> {
    @Value("#{vipProperties['zk.address']}")
    private String zkHost;

    @Value("#{vipProperties['zk.namespace']}")
    private String zkNamespace;
	
    @Override
    public CuratorFramework create() throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .namespace(zkNamespace)
                .connectString(zkHost)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }
    @Override
    public PooledObject<CuratorFramework> wrap(CuratorFramework obj) {
        return new DefaultPooledObject<CuratorFramework>(obj);
    }
}

<!--zk client pool -->
<bean id="curatorPool" class="org.apache.commons.pool2.impl.GenericObjectPool"
	destroy-method="close">
	<constructor-arg ref="curatorPoolFactory" />
	<property name="maxTotal" value="5" />
	<property name="maxIdle" value="2" />
</bean>

=====2、判断是否可以获得锁======

org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool");
org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject();
org.apache.curator.CuratorZookeeperClient zookeeperClient = curatorFramework.getZookeeperClient();
org.apache.zookeeper.ZooKeeper zooKeeper = zookeeperClient.getZooKeeper();
org.apache.zookeeper.data.Stat stat = zooKeeper.(final String zkPath, Watcher watcher);
stat != null && stat.getNumChildren() > 0 ==> 已锁 直接返回

====3、可以得到锁====

org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool");

try:
	org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject();
	org.apache.curator.framework.recipes.locks.InterProcessMutex lock = new InterProcessMutex(curatorFramework, zkPath) 
	lock.acquire();
	#DO SOMETHING#
finally:
	lock.release();
	curatorPool.returnObject(curatorFramework);


总结:

实现分布式的重点是Stat类

实现锁的中点是InterProcessMutex类



2f9b1173e4c728732ac2934803c5d9452c416e97




相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
13天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
30 2
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
48 0
|
2月前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
64 0
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
298 0
|
3月前
|
监控 前端开发 Java
JAVAEE分布式技术之Zookeeper技术
JAVAEE分布式技术之Zookeeper技术
16 0
JAVAEE分布式技术之Zookeeper技术
|
3月前
|
NoSQL 测试技术 Redis
Zookeeper实现分布式锁
ZooKeeper是一个分布式协调服务,其中提供的序列化、持久化、有层次的目录结构使得它非常适合用于实现分布式锁。在ZooKeeper中,分布式锁通常通过临时有序节点实现
|
3月前
|
存储 算法 Java
【分布式】Zookeeper 使用环境搭建
【1月更文挑战第25天】【分布式】Zookeeper 使用环境搭建
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
61 0