1.Curator简介
官网的说法:curator是一个Java/JVM客户端库,用于zookeeper,一个分布式协调服务。它包括一个高级API框架和实用程序,使ApacheZooKeeper的使用更加简单和可靠。它还包括常见用例和扩展的方法,如服务发现和Java8异步DSL。
官方使用文档:Apache Curator –
个人使用手册:Curator使用手册 - 腾讯云开发者社区-
腾讯云
配置zookeeper集群参考;https://blog.csdn.net/m0_63748493/article/details/125776295
2.共享锁和排他锁
排它锁,又称独占锁,独享锁 synchronized就是一个排它锁
共享锁,又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业获取到共享锁,也可以查看但是 无法修改和 删除数据
共享锁和排它锁典型是ReentranReadWriteLock 其中,读锁是共享锁,写锁是 排它锁
要么多读,要么一写,二者不可共存
3.实现共享锁
1.IDEA创建maven项目,pom.xml添加如下依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>curator</groupId>
<artifactId>curator</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>cn.itlym.shoulder</groupId>
<artifactId>lombok</artifactId>
<version>0.1</version>
</dependency>
2.鼠标右击,重新构建项目
3.创建CuratorCRUD类,和ZKShareLock类
public class CuratorCRUD {
public CuratorFramework curatorFramework;
public String ip="192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183";
public CuratorCRUD() {
createZkCuratorConnection();
}
/**
* 创建客户端连接
*/
public void createZkCuratorConnection(){
curatorFramework=CuratorFrameworkFactory
.builder()
.connectString(ip)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
curatorFramework.start();
}
/**
* 关闭客户端连接
*/
public void deleteZkCuratorConnection(){
curatorFramework.close();
}
}
public class ZKShareLock extends Thread{
private Object o=new Object();
private CuratorFramework curatorFramework;
private String basePath="/ShareLocks";
private String userName=basePath+"/User-";
private String cname;//客户端名字
public ZKShareLock(CuratorFramework curatorFramework,String cname) {
this.curatorFramework = curatorFramework;
this.cname=cname;
}
@Override
public void run() {
try {
//创建节点并获取节点名字
//得到完整目录/ShareLocks/User-0000000092
String nodeName = curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(userName, cname.getBytes());
System.out.println("创捷节点成功");
System.out.println(nodeName);
System.out.println();
//获取目录下子节点
List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
List<String> nodeNames =new ArrayList();
for (int i = 0; i < tempNodeNames.size(); i++) {
String name =tempNodeNames.get(i);
name=basePath+"/"+name;
nodeNames.add(name);
}
Collections.sort(nodeNames);
int index=nodeNames.indexOf(nodeName);
System.out.printf("index =%d \n",index);
if(index==0){
doSomthings(nodeName);
unlock(nodeName);
}else {
addWatcherWithTreeCache(nodeNames.get(index-1));
synchronized (o){
o.wait();
}
doSomthings(nodeName);
unlock(nodeName);
}
} catch (Exception e) {
e.printStackTrace();
}
curatorFramework.close();
}
//添加监听器
public void addWatcherWithTreeCache(String path) throws Exception {
TreeCache treeCache=new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListene=new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
System.out.printf("%s delete\n",path);
synchronized (o){
o.notify();
}
System.out.printf("%s notify\n",path);
}
}
};
treeCache.getListenable().addListener(treeCacheListene);
treeCache.start();
}
public void unlock(String nodeName){
System.out.printf("%s unlock",nodeName);
}
public void doSomthings(String nodeName){
System.out.printf("%s doSomthings",nodeName);
}
public static void main(String[] args) {
Date date=new Date();
for (int i = 0; i < 100; i++) {
new ZKShareLock(new CuratorCRUD().curatorFramework,"用户 "+i).start();
}
Date date1=new Date();
System.out.println(date1.getTime()-date.getTime());
}
}
运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip
部分运行结果:
4.实现排他锁
Curator中封装了一种分布式可重入排他锁:InterProcessMutex
创建CuratorMutex类,并在zookeeper中创建/Mutex节点
public class CuratorMutex implements Runnable{
public CuratorFramework curatorFramework;
public String basePath;
public InterProcessLock processLock;
public int idx;
public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
this.curatorFramework = curatorFramework;
this.basePath = basePath;
this.processLock=new InterProcessMutex(curatorFramework,basePath);
this.idx=idx;
}
@Override
public void run() {
Logger logger= Logger.getLogger("");
try {
//线程加锁
processLock.acquire(1000, TimeUnit.SECONDS);
logger.info(String.format("线程%d获取锁",idx));
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}finally {
//线程解锁
try {
processLock.release();
} catch (Exception e) {
e.printStackTrace();
}
logger.info(String.format("线程%d释放锁",idx));
}
}
public static void main(String[] args) {
CuratorFramework framework = new CuratorCRUD().curatorFramework;
for (int i = 0; i < 20; i++) {
new Thread(new CuratorMutex(framework,"/Mutex",i)).start();
}
while (true){
}
}
}
部分运行结果:
下面将从源码里带大家讲解如何实现可重入排他锁
acquire方法内部实际实际上调用了internalLock方法
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
//获取当前线程,并获取LockData锁信息
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
//lockCount自增,锁重入
lockData.lockCount.incrementAndGet();
return true;
}
//获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//创建锁,并将锁信息存放到threadData这个Map中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//减少重入次数
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{ //释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{ //从Map中移除该线程
threadData.remove(currentThread);
}
}