第一次写博客,写的不对的或者不明了的请大家多多指正
===object pool 介绍====
主要类、接口介绍:
接口 PooledObjectFactory 池对象工厂
用来创建池对象,
将不用的池对象进行钝化(passivateObject),
对要使用的池对象进行激活(activeObject),
对池对象进行验证(validateObject),
对有问题的池对象进行销毁(destroyObject)等工作
PooledObject<T> makeObject(); 生成对象
void activateObject(PooledObject<T> p); 激活对象
void destroyObject(PooledObject<T> p); 销毁对象
......
抽象类 BasePooledObjectFactory<T> implements PooledObjectFactory<T>
接口 ObjectPool : 对象池
T borrowObject(); 从池中借出一个对象
void returnObject(T obj); 归还对象
......
GenericObjectPool 创建对象池
实现的三个接口: ObjectPool 、 GenericObjectPoolMXBean 、 UsageTracking
主要问题解析:
如何实现公平锁:
pool底层维护了一个双端队列LinkedBlockingDeque
该双端队列使用ReentrantLock实现了可重入锁
因此底层可以修改ReentrantLock的公平和非公平机制实现pool的公平锁
初始化双端队列时可以设置GenericObjectPoolConfig.fairness为true来实现公平锁,默认为false(非公平锁)
双端队列LinkedBlockingDeque的长度默认为Integer.MAX_VALUE
borrowObject()怎么实现的:
从LinkedBlockingDeque双端队列中取第一个对象LinkedBlockingDeque.pollFirst(),
如果没有取到-->调用指定的factory.makeObject()-->factory.activateObject(p)-->return
如果取到了-->factory.activateObject(p)-->return
returnObject(T obj)怎么实现的:
1:锁定对象,验证对象的状态
2:factory.passivateObject(p) 钝化对象(空实现)
3:p.deallocate() 释放对象
4:如果对象池closed-->调用指定的factory.destroyObject(p);
如果对象池没有closed-->LinkedBlockingDeque.addFirst(p)
====org.apache.curator.framework包简单介绍====
主要类、接口介绍:
接口 CuratorFramework ZooKeeper客户端开源工具,主要提供了对客户端到服务的连接管理和连接重试机制,以及一些扩展功能
void start();
void close();
CuratorFrameworkState getState();
boolean isStarted();
CreateBuilder create();
......
类 ExponentialBackoffRetry 创建目录或者删除目录的重试策略
类 LockInternals 是所有申请锁与释放锁的核心实现
String attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径
boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) 循环等待尝试加锁
void releaseLock(String lockPath)
void deleteOurPath(String ourPath)
类 StandardLockInternalsDriver driver
String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) 创建节点
ourPath = client.create().creatingParentContainersIfNeeded()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, lockNodeBytes) 创建临时顺序节点路径
类 CreateBuilderImpl client.create()方法返回的客户端实现类实例
String forPath(final String givenPath, byte[] data)
String pathInForeground(final String path, final byte[] data)
(生成目录!!)createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode)
类 InterProcessMutex 互斥锁 一个可重入锁,提供分布式锁的入口服务
锁的定义:ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 将线程对象和锁对象(线程、路径、锁的数量)关联
void acquire() 无限等待,直到获取到锁(默认time=-1,unit=null)
boolean acquire(long time, TimeUnit unit) 有限等待,在规定的时间内获取锁
1:boolean internalLock(long time, TimeUnit unit)
当前线程在缓存中是否已经存在一把锁,如果存在,将锁的次数加1直接返回
如果不存在,执行第2步
2:(主要方法!!)String LockInternals.attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径
StandardLockInternalsDriver.createsTheLock(client, path, localLockNodeBytes) 创建节点
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
在指定的节点路径下,顺序创建子节点(按照请求顺序)
注意:对每个请求都会创建一个节点
LockInternals.internalLockLoop(startMillis, millisToWait, ourPath) 循环等待尝试加锁
a、根据basePath获取子节点,按照创建时间升序排列(最先申请的在最前)
b、查看子节点中是否包含传入的节点,并且下标是否为0(acquire区分maxLeases)
如果下标为0,直接返回锁定成功,否则进行c步骤
c、启动一个自定义watcher后台监听传入的节点的状态,一旦监听到节点数据变更或删除,则就直接 notifyFromWatcher();
同时根据millisToWait(acquire方法的不同参数区分)判断是否wait
d、如果出现异常,删除节点路径
void release() 判断是否是当前线程,或者非当前线程。最终会根据线程号找到对应的path路径,然后直接删除该临时节点
LockInternals.releaseLock(String lockPath)
LockInternals.deleteOurPath(String ourPath)
client.delete().guaranteed().forPath(ourPath);
扩展:
方法 Thread.currentThread()
方法 AtomicInteger lockCount.incrementAndGet();
类 org.apache.curator.RetryLoop
=====具体实现代码(伪代码)======
=====1、初始化对象池==========
@Component("curatorPoolFactory")
public class CuratorPoolFactory extends BasePooledObjectFactory<CuratorFramework> {
@Value("#{vipProperties['zk.address']}")
private String zkHost;
@Value("#{vipProperties['zk.namespace']}")
private String zkNamespace;
@Override
public CuratorFramework create() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.namespace(zkNamespace)
.connectString(zkHost)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}
@Override
public PooledObject<CuratorFramework> wrap(CuratorFramework obj) {
return new DefaultPooledObject<CuratorFramework>(obj);
}
}
<!--zk client pool -->
<bean id="curatorPool" class="org.apache.commons.pool2.impl.GenericObjectPool"
destroy-method="close">
<constructor-arg ref="curatorPoolFactory" />
<property name="maxTotal" value="5" />
<property name="maxIdle" value="2" />
</bean>
=====2、判断是否可以获得锁======
org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool");
org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject();
org.apache.curator.CuratorZookeeperClient zookeeperClient = curatorFramework.getZookeeperClient();
org.apache.zookeeper.ZooKeeper zooKeeper = zookeeperClient.getZooKeeper();
org.apache.zookeeper.data.Stat stat = zooKeeper.(final String zkPath, Watcher watcher);
stat != null && stat.getNumChildren() > 0 ==> 已锁 直接返回
====3、可以得到锁====
org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool");
try:
org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject();
org.apache.curator.framework.recipes.locks.InterProcessMutex lock = new InterProcessMutex(curatorFramework, zkPath)
lock.acquire();
#DO SOMETHING#
finally:
lock.release();
curatorPool.returnObject(curatorFramework);
总结:
实现分布式的重点是Stat类
实现锁的中点是InterProcessMutex类